diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index 1db8518cf054..a5f9655e76bb 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -238,6 +238,13 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon TabletInfos GetTabletInfos(const std::vector& ids) override; + Result CollectEntries( + const google::protobuf::RepeatedPtrField& tables, + bool add_indexes, + bool include_parent_colocated_table) override; + + server::Clock* Clock() override; + const Schema& schema() override; void Submit(std::unique_ptr operation) override; diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index 8f5d33f9b40e..426ab5720e70 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -379,19 +379,32 @@ void CatalogManager::Submit(std::unique_ptr operation) { tablet_peer()->Submit(std::move(operation), leader_ready_term()); } -Status CatalogManager::CreateTransactionAwareSnapshot( - const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) { +Result CatalogManager::CollectEntries( + const google::protobuf::RepeatedPtrField& table_identifiers, + bool add_indexes, + bool include_parent_colocated_table) { SysRowEntries entries; - auto tables = VERIFY_RESULT(CollectTables(req.tables(), - req.add_indexes(), - true /* include_parent_colocated_table */)); + auto tables = VERIFY_RESULT(CollectTables( + table_identifiers, add_indexes, include_parent_colocated_table)); for (const auto& table : tables) { // TODO(txn_snapshot) use single lock to resolve all tables to tablets SnapshotInfo::AddEntries(table, entries.mutable_entries(), /* tablet_infos= */ nullptr); } + return entries; +} + +server::Clock* CatalogManager::Clock() { + return master_->clock(); +} + +Status CatalogManager::CreateTransactionAwareSnapshot( + const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) { + SysRowEntries entries = VERIFY_RESULT(CollectEntries( + req.tables(), req.add_indexes(), true /* include_parent_colocated_table */)); + auto snapshot_id = VERIFY_RESULT(snapshot_coordinator_.Create( - entries, req.imported(), master_->clock()->MaxGlobalNow(), rpc->GetClientDeadline())); + entries, req.imported(), rpc->GetClientDeadline())); resp->set_snapshot_id(snapshot_id.data(), snapshot_id.size()); return Status::OK(); } diff --git a/ent/src/yb/master/master_backup.proto b/ent/src/yb/master/master_backup.proto index 98bb8d9dfb4b..a344513dccb6 100644 --- a/ent/src/yb/master/master_backup.proto +++ b/ent/src/yb/master/master_backup.proto @@ -51,6 +51,8 @@ message SysSnapshotEntryPB { optional fixed64 snapshot_hybrid_time = 4; optional int64 version = 5; + + optional bytes schedule_id = 6; } //////////////////////////////////////////////////////////// @@ -188,6 +190,7 @@ message ListSnapshotSchedulesRequestPB { message SnapshotScheduleInfoPB { optional bytes id = 1; optional SnapshotScheduleOptionsPB options = 2; + repeated SnapshotInfoPB snapshots = 3; } message ListSnapshotSchedulesResponsePB { diff --git a/src/yb/client/CMakeLists.txt b/src/yb/client/CMakeLists.txt index 6b9dec4a8d67..1c9fd753b771 100644 --- a/src/yb/client/CMakeLists.txt +++ b/src/yb/client/CMakeLists.txt @@ -106,7 +106,7 @@ target_link_libraries(yb_client_test_util gmock yb_client) -add_library(ql-dml-test-base ql-dml-test-base.cc txn-test-base.cc) +add_library(ql-dml-test-base ql-dml-test-base.cc txn-test-base.cc snapshot_test_base.cc) target_link_libraries(ql-dml-test-base integration-tests) # Tests diff --git a/src/yb/client/backup-txn-test.cc b/src/yb/client/backup-txn-test.cc index 7b5c74696deb..4a841f99bc6d 100644 --- a/src/yb/client/backup-txn-test.cc +++ b/src/yb/client/backup-txn-test.cc @@ -12,8 +12,8 @@ // #include "yb/client/session.h" +#include "yb/client/snapshot_test_base.h" #include "yb/client/transaction.h" -#include "yb/client/txn-test-base.h" #include "yb/common/transaction_error.h" @@ -45,13 +45,10 @@ DECLARE_uint64(snapshot_coordinator_poll_interval_ms); namespace yb { namespace client { -using Snapshots = google::protobuf::RepeatedPtrField; using ImportedSnapshotData = google::protobuf::RepeatedPtrField< master::ImportSnapshotMetaResponsePB::TableMetaPB>; -constexpr auto kWaitTimeout = 15s; - -class BackupTxnTest : public TransactionTestBase { +class BackupTxnTest : public SnapshotTestBase { protected: void SetUp() override { FLAGS_enable_history_cutoff_propagation = true; @@ -69,11 +66,6 @@ class BackupTxnTest : public TransactionTestBase { TransactionTestBase::DoBeforeTearDown(); } - master::MasterBackupServiceProxy MakeBackupServiceProxy() { - return master::MasterBackupServiceProxy( - &client_->proxy_cache(), cluster_->leader_mini_master()->bound_rpc_addr()); - } - Result StartSnapshot() { rpc::RpcController controller; controller.set_timeout(60s); @@ -86,20 +78,6 @@ class BackupTxnTest : public TransactionTestBase { return FullyDecodeTxnSnapshotId(resp.snapshot_id()); } - Result SnapshotState(const TxnSnapshotId& snapshot_id) { - auto snapshots = VERIFY_RESULT(ListSnapshots(snapshot_id)); - if (snapshots.size() != 1) { - return STATUS_FORMAT(RuntimeError, "Wrong number of snapshots, one expected but $0 found", - snapshots.size()); - } - LOG(INFO) << "Snapshot state: " << snapshots[0].ShortDebugString(); - return snapshots[0].entry().state(); - } - - Result IsSnapshotDone(const TxnSnapshotId& snapshot_id) { - return VERIFY_RESULT(SnapshotState(snapshot_id)) == SysSnapshotEntryPB::COMPLETE; - } - Result StartRestoration( const TxnSnapshotId& snapshot_id, HybridTime restore_at = HybridTime(), int64_t interval = 0) { @@ -147,98 +125,12 @@ class BackupTxnTest : public TransactionTestBase { }, kWaitTimeout * kTimeMultiplier, "Restoration done"); } - Result ListSnapshots( - const TxnSnapshotId& snapshot_id = TxnSnapshotId::Nil()) { - master::ListSnapshotsRequestPB req; - master::ListSnapshotsResponsePB resp; - - req.set_list_deleted_snapshots(true); - if (!snapshot_id.IsNil()) { - req.set_snapshot_id(snapshot_id.data(), snapshot_id.size()); - } - - rpc::RpcController controller; - controller.set_timeout(60s); - RETURN_NOT_OK(MakeBackupServiceProxy().ListSnapshots(req, &resp, &controller)); - if (resp.has_error()) { - return StatusFromPB(resp.error().status()); - } - LOG(INFO) << "Snapshots: " << resp.ShortDebugString(); - return std::move(resp.snapshots()); - } - - CHECKED_STATUS VerifySnapshot( - const TxnSnapshotId& snapshot_id, SysSnapshotEntryPB::State state) { - auto snapshots = VERIFY_RESULT(ListSnapshots()); - SCHECK_EQ(snapshots.size(), 1, IllegalState, "Wrong number of snapshots"); - const auto& snapshot = snapshots[0]; - auto listed_snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(snapshot.id())); - if (listed_snapshot_id != snapshot_id) { - return STATUS_FORMAT( - IllegalState, "Wrong snapshot id returned $0, expected $1", listed_snapshot_id, - snapshot_id); - } - if (snapshot.entry().state() != state) { - return STATUS_FORMAT( - IllegalState, "Wrong snapshot state: $0 vs $1", - SysSnapshotEntryPB::State_Name(snapshot.entry().state()), - SysSnapshotEntryPB::State_Name(state)); - } - size_t num_namespaces = 0, num_tables = 0, num_tablets = 0; - for (const auto& entry : snapshot.entry().entries()) { - switch (entry.type()) { - case master::SysRowEntry::TABLET: - ++num_tablets; - break; - case master::SysRowEntry::TABLE: - ++num_tables; - break; - case master::SysRowEntry::NAMESPACE: - ++num_namespaces; - break; - default: - return STATUS_FORMAT( - IllegalState, "Unexpected entry type: $0", - master::SysRowEntry::Type_Name(entry.type())); - } - } - SCHECK_EQ(num_namespaces, 1, IllegalState, "Wrong number of namespaces"); - SCHECK_EQ(num_tables, 1, IllegalState, "Wrong number of tables"); - SCHECK_EQ(num_tablets, table_.table()->GetPartitionCount(), IllegalState, - "Wrong number of tablets"); - - return Status::OK(); - } - Result CreateSnapshot() { TxnSnapshotId snapshot_id = VERIFY_RESULT(StartSnapshot()); RETURN_NOT_OK(WaitSnapshotDone(snapshot_id)); return snapshot_id; } - CHECKED_STATUS WaitSnapshotInState( - const TxnSnapshotId& snapshot_id, SysSnapshotEntryPB::State state, - MonoDelta duration = kWaitTimeout) { - auto state_name = SysSnapshotEntryPB::State_Name(state); - SysSnapshotEntryPB::State last_state = SysSnapshotEntryPB::UNKNOWN; - auto status = WaitFor([this, &snapshot_id, state, &last_state]() -> Result { - last_state = VERIFY_RESULT(SnapshotState(snapshot_id)); - return last_state == state; - }, duration * kTimeMultiplier, "Snapshot in state " + state_name); - - if (!status.ok() && status.IsTimedOut()) { - return STATUS_FORMAT( - IllegalState, "Wrong snapshot state: $0, while $1 expected", - SysSnapshotEntryPB::State_Name(last_state), state_name); - } - return status; - } - - CHECKED_STATUS WaitSnapshotDone( - const TxnSnapshotId& snapshot_id, MonoDelta duration = kWaitTimeout) { - return WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::COMPLETE, duration); - } - CHECKED_STATUS DeleteSnapshot(const TxnSnapshotId& snapshot_id) { master::DeleteSnapshotRequestPB req; master::DeleteSnapshotResponsePB resp; diff --git a/src/yb/client/snapshot-schedule-test.cc b/src/yb/client/snapshot-schedule-test.cc index 5bcf7b769c31..aa9a2034e4bb 100644 --- a/src/yb/client/snapshot-schedule-test.cc +++ b/src/yb/client/snapshot-schedule-test.cc @@ -11,30 +11,37 @@ // under the License. // -#include "yb/client/txn-test-base.h" +#include "yb/client/snapshot_test_base.h" #include "yb/master/master_backup.proxy.h" using namespace std::literals; +DECLARE_uint64(snapshot_coordinator_poll_interval_ms); + + namespace yb { namespace client { using Schedules = google::protobuf::RepeatedPtrField; +constexpr auto kSnapshotInterval = 10s * kTimeMultiplier; -class SnapshotScheduleTest : public TransactionTestBase { +class SnapshotScheduleTest : public SnapshotTestBase { public: - master::MasterBackupServiceProxy MakeBackupServiceProxy() { - return master::MasterBackupServiceProxy( - &client_->proxy_cache(), cluster_->leader_mini_master()->bound_rpc_addr()); + void SetUp() override { + FLAGS_snapshot_coordinator_poll_interval_ms = 250; + SnapshotTestBase::SetUp(); } Result CreateSchedule() { rpc::RpcController controller; controller.set_timeout(60s); master::CreateSnapshotScheduleRequestPB req; - req.mutable_options()->set_interval_sec(60); - req.mutable_options()->set_retention_duration_sec(1200); + auto& options = *req.mutable_options(); + options.set_interval_sec(std::chrono::seconds(kSnapshotInterval).count()); + options.set_retention_duration_sec(1200); + auto& tables = *options.mutable_filter()->mutable_tables()->mutable_tables(); + tables.Add()->set_table_id(table_.table()->id()); master::CreateSnapshotScheduleResponsePB resp; RETURN_NOT_OK(MakeBackupServiceProxy().CreateSnapshotSchedule(req, &resp, &controller)); return FullyDecodeSnapshotScheduleId(resp.snapshot_schedule_id()); @@ -86,5 +93,36 @@ TEST_F(SnapshotScheduleTest, Create) { } } +TEST_F(SnapshotScheduleTest, Snapshot) { + ASSERT_NO_FATALS(WriteData()); + auto schedule_id = ASSERT_RESULT(CreateSchedule()); + ASSERT_OK(WaitFor([this, schedule_id]() -> Result { + auto snapshots = VERIFY_RESULT(ListSnapshots()); + EXPECT_LE(snapshots.size(), 1); + LOG(INFO) << "Snapshots: " << AsString(snapshots); + for (const auto& snapshot : snapshots) { + EXPECT_EQ(TryFullyDecodeSnapshotScheduleId(snapshot.entry().schedule_id()), schedule_id); + if (snapshot.entry().state() == master::SysSnapshotEntryPB::COMPLETE) { + return true; + } + } + return false; + }, kSnapshotInterval / 2, "First snapshot")); + + auto schedules = ASSERT_RESULT(ListSchedules()); + ASSERT_EQ(schedules.size(), 1); + ASSERT_EQ(schedules[0].snapshots().size(), 1); + ASSERT_EQ(schedules[0].snapshots()[0].entry().state(), master::SysSnapshotEntryPB::COMPLETE); + + std::this_thread::sleep_for(kSnapshotInterval / 4); + auto snapshots = ASSERT_RESULT(ListSnapshots()); + ASSERT_EQ(snapshots.size(), 1); + + ASSERT_OK(WaitFor([this]() -> Result { + auto snapshots = VERIFY_RESULT(ListSnapshots()); + return snapshots.size() == 2; + }, kSnapshotInterval, "Second snapshot")); +} + } // namespace client } // namespace yb diff --git a/src/yb/client/snapshot_test_base.cc b/src/yb/client/snapshot_test_base.cc new file mode 100644 index 000000000000..d01c4eb6a4cf --- /dev/null +++ b/src/yb/client/snapshot_test_base.cc @@ -0,0 +1,128 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/client/snapshot_test_base.h" + +using namespace std::literals; + +namespace yb { +namespace client { + +master::MasterBackupServiceProxy SnapshotTestBase::MakeBackupServiceProxy() { + return master::MasterBackupServiceProxy( + &client_->proxy_cache(), cluster_->leader_mini_master()->bound_rpc_addr()); +} + +Result SnapshotTestBase::SnapshotState( + const TxnSnapshotId& snapshot_id) { + auto snapshots = VERIFY_RESULT(ListSnapshots(snapshot_id)); + if (snapshots.size() != 1) { + return STATUS_FORMAT(RuntimeError, "Wrong number of snapshots, one expected but $0 found", + snapshots.size()); + } + LOG(INFO) << "Snapshot state: " << snapshots[0].ShortDebugString(); + return snapshots[0].entry().state(); +} + +Result SnapshotTestBase::IsSnapshotDone(const TxnSnapshotId& snapshot_id) { + return VERIFY_RESULT(SnapshotState(snapshot_id)) == master::SysSnapshotEntryPB::COMPLETE; +} + +Result SnapshotTestBase::ListSnapshots(const TxnSnapshotId& snapshot_id) { + master::ListSnapshotsRequestPB req; + master::ListSnapshotsResponsePB resp; + + req.set_list_deleted_snapshots(true); + if (!snapshot_id.IsNil()) { + req.set_snapshot_id(snapshot_id.data(), snapshot_id.size()); + } + + rpc::RpcController controller; + controller.set_timeout(60s); + RETURN_NOT_OK(MakeBackupServiceProxy().ListSnapshots(req, &resp, &controller)); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + LOG(INFO) << "Snapshots: " << resp.ShortDebugString(); + return std::move(resp.snapshots()); +} + +CHECKED_STATUS SnapshotTestBase::VerifySnapshot( + const TxnSnapshotId& snapshot_id, master::SysSnapshotEntryPB::State state) { + auto snapshots = VERIFY_RESULT(ListSnapshots()); + SCHECK_EQ(snapshots.size(), 1, IllegalState, "Wrong number of snapshots"); + const auto& snapshot = snapshots[0]; + auto listed_snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(snapshot.id())); + if (listed_snapshot_id != snapshot_id) { + return STATUS_FORMAT( + IllegalState, "Wrong snapshot id returned $0, expected $1", listed_snapshot_id, + snapshot_id); + } + if (snapshot.entry().state() != state) { + return STATUS_FORMAT( + IllegalState, "Wrong snapshot state: $0 vs $1", + master::SysSnapshotEntryPB::State_Name(snapshot.entry().state()), + master::SysSnapshotEntryPB::State_Name(state)); + } + size_t num_namespaces = 0, num_tables = 0, num_tablets = 0; + for (const auto& entry : snapshot.entry().entries()) { + switch (entry.type()) { + case master::SysRowEntry::TABLET: + ++num_tablets; + break; + case master::SysRowEntry::TABLE: + ++num_tables; + break; + case master::SysRowEntry::NAMESPACE: + ++num_namespaces; + break; + default: + return STATUS_FORMAT( + IllegalState, "Unexpected entry type: $0", + master::SysRowEntry::Type_Name(entry.type())); + } + } + SCHECK_EQ(num_namespaces, 1, IllegalState, "Wrong number of namespaces"); + SCHECK_EQ(num_tables, 1, IllegalState, "Wrong number of tables"); + SCHECK_EQ(num_tablets, table_.table()->GetPartitionCount(), IllegalState, + "Wrong number of tablets"); + + return Status::OK(); +} + +CHECKED_STATUS SnapshotTestBase::WaitSnapshotInState( + const TxnSnapshotId& snapshot_id, master::SysSnapshotEntryPB::State state, + MonoDelta duration) { + auto state_name = master::SysSnapshotEntryPB::State_Name(state); + master::SysSnapshotEntryPB::State last_state = master::SysSnapshotEntryPB::UNKNOWN; + auto status = WaitFor([this, &snapshot_id, state, &last_state]() -> Result { + last_state = VERIFY_RESULT(SnapshotState(snapshot_id)); + return last_state == state; + }, duration * kTimeMultiplier, "Snapshot in state " + state_name); + + if (!status.ok() && status.IsTimedOut()) { + return STATUS_FORMAT( + IllegalState, "Wrong snapshot state: $0, while $1 expected", + master::SysSnapshotEntryPB::State_Name(last_state), state_name); + } + return status; +} + +CHECKED_STATUS SnapshotTestBase::WaitSnapshotDone( + const TxnSnapshotId& snapshot_id, MonoDelta duration) { + return WaitSnapshotInState(snapshot_id, master::SysSnapshotEntryPB::COMPLETE, duration); +} + + +} // namespace client +} // namespace yb diff --git a/src/yb/client/snapshot_test_base.h b/src/yb/client/snapshot_test_base.h new file mode 100644 index 000000000000..740e99527853 --- /dev/null +++ b/src/yb/client/snapshot_test_base.h @@ -0,0 +1,46 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_CLIENT_SNAPSHOT_TEST_BASE_H +#define YB_CLIENT_SNAPSHOT_TEST_BASE_H + +#include "yb/client/txn-test-base.h" + +#include "yb/master/master_backup.proxy.h" + +namespace yb { +namespace client { + +using Snapshots = google::protobuf::RepeatedPtrField; +constexpr auto kWaitTimeout = std::chrono::seconds(15); + +class SnapshotTestBase : public TransactionTestBase { + protected: + master::MasterBackupServiceProxy MakeBackupServiceProxy(); + + Result SnapshotState(const TxnSnapshotId& snapshot_id); + Result IsSnapshotDone(const TxnSnapshotId& snapshot_id); + Result ListSnapshots(const TxnSnapshotId& snapshot_id = TxnSnapshotId::Nil()); + CHECKED_STATUS VerifySnapshot( + const TxnSnapshotId& snapshot_id, master::SysSnapshotEntryPB::State state); + CHECKED_STATUS WaitSnapshotInState( + const TxnSnapshotId& snapshot_id, master::SysSnapshotEntryPB::State state, + MonoDelta duration = kWaitTimeout); + CHECKED_STATUS WaitSnapshotDone( + const TxnSnapshotId& snapshot_id, MonoDelta duration = kWaitTimeout); +}; + +} // namespace client +} // namespace yb + +#endif // YB_CLIENT_SNAPSHOT_TEST_BASE_H diff --git a/src/yb/common/hybrid_time.h b/src/yb/common/hybrid_time.h index 0dcaabb2549d..d2c95e308cdd 100644 --- a/src/yb/common/hybrid_time.h +++ b/src/yb/common/hybrid_time.h @@ -155,6 +155,10 @@ class HybridTime { return AddMicroseconds(millis * MonoTime::kMicrosecondsPerMillisecond); } + HybridTime AddSeconds(int64_t seconds) const { + return AddMicroseconds(seconds * MonoTime::kMicrosecondsPerSecond); + } + HybridTime AddDelta(MonoDelta delta) const { return AddMicroseconds(delta.ToMicroseconds()); } diff --git a/src/yb/master/master_snapshot_coordinator.cc b/src/yb/master/master_snapshot_coordinator.cc index 0a038f240352..94991da4dda0 100644 --- a/src/yb/master/master_snapshot_coordinator.cc +++ b/src/yb/master/master_snapshot_coordinator.cc @@ -15,6 +15,9 @@ #include +#include +#include + #include "yb/common/snapshot.h" #include "yb/common/transaction_error.h" @@ -95,8 +98,8 @@ auto MakeDoneCallback( return; } - it->second->Done(tablet_id, ResultToStatus(resp)); - post_process(it->second.get(), &lock); + (**it).Done(tablet_id, ResultToStatus(resp)); + post_process(it->get(), &lock); } }; @@ -109,31 +112,6 @@ auto MakeDoneCallback( }; } -auto SnapshotUpdater(SnapshotCoordinatorContext* context) { - struct UpdateFunctor { - SnapshotCoordinatorContext& context; - - void operator()(SnapshotState* snapshot, std::unique_lock* lock) const { - if (!snapshot->AllTabletsDone()) { - return; - } - docdb::KeyValueWriteBatchPB write_batch; - auto status = snapshot->StoreToWriteBatch(&write_batch); - if (!status.ok()) { - LOG(DFATAL) << "Failed to prepare write batch for snapshot: " << status; - return; - } - lock->unlock(); - - SubmitWrite(std::move(write_batch), &context); - } - }; - - return UpdateFunctor { - .context = *context - }; -} - } // namespace class MasterSnapshotCoordinator::Impl { @@ -142,31 +120,11 @@ class MasterSnapshotCoordinator::Impl { : context_(*context), poller_(std::bind(&Impl::Poll, this)) {} Result Create( - const SysRowEntries& entries, bool imported, HybridTime snapshot_hybrid_time, - CoarseTimePoint deadline) { + const SysRowEntries& entries, bool imported, CoarseTimePoint deadline) { auto synchronizer = std::make_shared(); - auto operation_state = std::make_unique(/* tablet= */ nullptr); - auto request = operation_state->AllocateRequest(); - - for (const auto& entry : entries.entries()) { - if (entry.type() == SysRowEntry::TABLET) { - request->add_tablet_id(entry.id()); - } - } - - request->set_snapshot_hybrid_time(snapshot_hybrid_time.ToUint64()); - request->set_operation(tserver::TabletSnapshotOpRequestPB::CREATE_ON_MASTER); - auto snapshot_id = TxnSnapshotId::GenerateRandom(); - request->set_snapshot_id(snapshot_id.data(), snapshot_id.size()); - request->set_imported(imported); - - request->mutable_extra_data()->PackFrom(entries); - - operation_state->set_completion_callback(std::make_unique< - tablet::WeakSynchronizerOperationCompletionCallback>(synchronizer)); - auto operation = std::make_unique(std::move(operation_state)); - - context_.Submit(std::move(operation)); + auto snapshot_id = VERIFY_RESULT(SubmitCreate( + entries, imported, SnapshotScheduleId::Nil(), TxnSnapshotId::Nil(), + std::make_unique(synchronizer))); RETURN_NOT_OK(synchronizer->WaitUntil(ToSteady(deadline))); return snapshot_id; @@ -186,13 +144,13 @@ class MasterSnapshotCoordinator::Impl { RETURN_NOT_OK(snapshot->StoreToWriteBatch(&write_batch)); { std::lock_guard lock(mutex_); - auto emplace_result = snapshots_.emplace(id, std::move(snapshot)); + auto emplace_result = snapshots_.emplace(std::move(snapshot)); if (!emplace_result.second) { return STATUS_FORMAT(IllegalState, "Duplicate snapshot id: $0", id); } if (leader_term >= 0) { - emplace_result.first->second->PrepareOperations(&operations); + (**emplace_result.first).PrepareOperations(&operations); } } @@ -282,12 +240,12 @@ class MasterSnapshotCoordinator::Impl { if (snapshot_id.IsNil()) { for (const auto& p : snapshots_) { if (!list_deleted) { - auto aggreaged_state = p.second->AggregatedState(); + auto aggreaged_state = p->AggregatedState(); if (aggreaged_state.ok() && *aggreaged_state == SysSnapshotEntryPB::DELETED) { continue; } } - RETURN_NOT_OK(p.second->ToPB(resp->add_snapshots())); + RETURN_NOT_OK(p->ToPB(resp->add_snapshots())); } return Status::OK(); } @@ -347,8 +305,8 @@ class MasterSnapshotCoordinator::Impl { std::lock_guard lock(mutex_); if (!restoration_id) { for (const auto& p : restorations_) { - if (!snapshot_id || p.second->snapshot_id() == snapshot_id) { - RETURN_NOT_OK(p.second->ToPB(resp->add_restorations())); + if (!snapshot_id || p->snapshot_id() == snapshot_id) { + RETURN_NOT_OK(p->ToPB(resp->add_restorations())); } } return Status::OK(); @@ -372,7 +330,7 @@ class MasterSnapshotCoordinator::Impl { auto restoration = std::make_unique(&context_, restoration_id, &snapshot); tablet_infos = restoration->PrepareOperations(); - restorations_.emplace(restoration_id, std::move(restoration)); + restorations_.emplace(std::move(restoration)); } auto snapshot_id_str = snapshot_id.AsSlice().ToBuffer(); @@ -404,13 +362,13 @@ class MasterSnapshotCoordinator::Impl { std::lock_guard lock(mutex_); if (snapshot_schedule_id.IsNil()) { for (const auto& p : schedules_) { - RETURN_NOT_OK(p.second->ToPB(resp->add_schedules())); + RETURN_NOT_OK(FillSchedule(*p, resp->add_schedules())); } return Status::OK(); } SnapshotScheduleState& schedule = VERIFY_RESULT(FindSnapshotSchedule(snapshot_schedule_id)); - return schedule.ToPB(resp->add_schedules()); + return FillSchedule(schedule, resp->add_schedules()); } void Start() { @@ -440,16 +398,16 @@ class MasterSnapshotCoordinator::Impl { REQUIRES(mutex_) { VLOG(1) << __func__ << "(" << id << ", " << data.ShortDebugString() << ")"; - auto new_entry = std::make_unique(&context_, id, data); + auto new_entry = std::make_unique(&context_, id, data); auto it = map->find(id); if (it == map->end()) { - map->emplace(id, std::move(new_entry)); - } else if (it->second->ShouldUpdate(*new_entry)) { - it->second = std::move(new_entry); + map->emplace(std::move(new_entry)); + } else if ((**it).ShouldUpdate(*new_entry)) { + map->replace(it, std::move(new_entry)); } else { LOG(INFO) << __func__ << " ignore because of version check, existing: " - << it->second->ToString() << ", loaded: " << new_entry->ToString(); + << (**it).ToString() << ", loaded: " << new_entry->ToString(); } return Status::OK(); @@ -461,7 +419,7 @@ class MasterSnapshotCoordinator::Impl { return STATUS(NotFound, "Could not find snapshot", snapshot_id.ToString(), MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); } - return *it->second; + return **it; } Result FindRestoration( @@ -471,7 +429,7 @@ class MasterSnapshotCoordinator::Impl { return STATUS(NotFound, "Could not find restoration", restoration_id.ToString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); } - return *it->second; + return **it; } Result FindSnapshotSchedule( @@ -481,7 +439,7 @@ class MasterSnapshotCoordinator::Impl { return STATUS(NotFound, "Could not find snapshot schedule", id.ToString(), MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); } - return *it->second; + return **it; } void ExecuteOperations(const TabletSnapshotOperations& operations) { @@ -506,7 +464,7 @@ class MasterSnapshotCoordinator::Impl { const TabletSnapshotOperation& operation, const TabletInfoPtr& tablet_info) { auto callback = MakeDoneCallback( &mutex_, snapshots_, operation.snapshot_id, operation.tablet_id, - SnapshotUpdater(&context_)); + std::bind(&Impl::UpdateSnapshot, this, _1, _2)); if (!tablet_info) { callback(STATUS_FORMAT(NotFound, "Tablet info not found for $0", operation.tablet_id)); return; @@ -531,20 +489,40 @@ class MasterSnapshotCoordinator::Impl { VLOG(4) << __func__ << "()"; std::vector cleanup_snapshots; TabletSnapshotOperations operations; + SnapshotScheduleOperations schedule_operations; { std::lock_guard lock(mutex_); for (const auto& p : snapshots_) { - if (p.second->NeedCleanup()) { - cleanup_snapshots.push_back(p.first); + if (p->NeedCleanup()) { + cleanup_snapshots.push_back(p->id()); } else { - p.second->PrepareOperations(&operations); + p->PrepareOperations(&operations); } } + auto now = context_.Clock()->Now(); + for (const auto& p : schedules_) { + p->PrepareOperations(LastSnapshotTime(p->id()), now, &schedule_operations); + } } for (const auto& id : cleanup_snapshots) { DeleteSnapshot(id); } ExecuteOperations(operations); + for (const auto& operation : schedule_operations) { + WARN_NOT_OK(ExecuteScheduleOperation(operation), + Format("Failed to execute operation on $0", operation.schedule_id)); + } + } + + HybridTime LastSnapshotTime(const SnapshotScheduleId& schedule_id) REQUIRES(mutex_) { + auto& index = snapshots_.get(); + auto it = index.upper_bound(schedule_id); + if (it == index.begin()) { + return HybridTime::kInvalid; + } + --it; + return (**it).schedule_id() == schedule_id ? (**it).snapshot_hybrid_time() + : HybridTime::kInvalid; } void DeleteSnapshot(const TxnSnapshotId& snapshot_id) { @@ -563,14 +541,177 @@ class MasterSnapshotCoordinator::Impl { SubmitWrite(std::move(write_batch), &context_); } + CHECKED_STATUS ExecuteScheduleOperation(const SnapshotScheduleOperation& operation) { + auto entries = VERIFY_RESULT(context_.CollectEntries( + operation.filter.tables().tables(), true, true)); + RETURN_NOT_OK(SubmitCreate( + entries, false, operation.schedule_id, operation.snapshot_id, + tablet::MakeFunctorOperationCompletionCallback( + [this, schedule_id = operation.schedule_id, snapshot_id = operation.snapshot_id]( + const Status& status) { + if (!status.ok()) { + CreateSnapshotAborted(status, schedule_id, snapshot_id); + } + }))); + return Status::OK(); + } + + void CreateSnapshotAborted( + const Status& status, const SnapshotScheduleId& schedule_id, + const TxnSnapshotId& snapshot_id) { + LOG(INFO) << __func__ << " for " << schedule_id << ", snapshot: " << snapshot_id + << ", status: " << status; + std::lock_guard lock(mutex_); + auto it = schedules_.find(schedule_id); + if (it == schedules_.end()) { + return; + } + (**it).SnapshotFinished(snapshot_id, status); + } + + Result SubmitCreate( + const SysRowEntries& entries, bool imported, const SnapshotScheduleId& schedule_id, + TxnSnapshotId snapshot_id, + std::unique_ptr completion_clbk) { + auto operation_state = std::make_unique(/* tablet= */ nullptr); + auto request = operation_state->AllocateRequest(); + + for (const auto& entry : entries.entries()) { + if (entry.type() == SysRowEntry::TABLET) { + request->add_tablet_id(entry.id()); + } + } + + request->set_snapshot_hybrid_time(context_.Clock()->MaxGlobalNow().ToUint64()); + request->set_operation(tserver::TabletSnapshotOpRequestPB::CREATE_ON_MASTER); + if (!snapshot_id) { + snapshot_id = TxnSnapshotId::GenerateRandom(); + } + request->set_snapshot_id(snapshot_id.data(), snapshot_id.size()); + request->set_imported(imported); + if (schedule_id) { + request->set_schedule_id(schedule_id.data(), schedule_id.size()); + } + + request->mutable_extra_data()->PackFrom(entries); + + operation_state->set_completion_callback(std::move(completion_clbk)); + auto operation = std::make_unique(std::move(operation_state)); + + context_.Submit(std::move(operation)); + + return snapshot_id; + } + + void UpdateSnapshot(SnapshotState* snapshot, std::unique_lock* lock) + REQUIRES(mutex_) { + if (!snapshot->AllTabletsDone()) { + return; + } + + if (snapshot->schedule_id()) { + UpdateSchedule(*snapshot); + } + + docdb::KeyValueWriteBatchPB write_batch; + auto status = snapshot->StoreToWriteBatch(&write_batch); + if (!status.ok()) { + LOG(DFATAL) << "Failed to prepare write batch for snapshot: " << status; + return; + } + lock->unlock(); + + SubmitWrite(std::move(write_batch), &context_); + }; + + void UpdateSchedule(const SnapshotState& snapshot) REQUIRES(mutex_) { + auto it = schedules_.find(snapshot.schedule_id()); + if (it == schedules_.end()) { + return; + } + + auto state = snapshot.AggregatedState(); + Status status; + if (!state.ok()) { + status = state.status(); + } else { + switch (*state) { + case SysSnapshotEntryPB::COMPLETE: + status = Status::OK(); + break; + case SysSnapshotEntryPB::FAILED: + status = snapshot.AnyFailure(); + break; + case SysSnapshotEntryPB::DELETED: + return; + default: + LOG(DFATAL) << "Unexpected snapshot state: " << *state << " for " << snapshot.id(); + return; + } + } + (**it).SnapshotFinished(snapshot.id(), status); + } + + CHECKED_STATUS FillSchedule(const SnapshotScheduleState& schedule, SnapshotScheduleInfoPB* out) + REQUIRES(mutex_) { + RETURN_NOT_OK(schedule.ToPB(out)); + const auto& index = snapshots_.get(); + auto p = index.equal_range(boost::make_tuple(schedule.id())); + for (auto i = p.first; i != p.second; ++i) { + RETURN_NOT_OK((**i).ToPB(out->add_snapshots())); + } + return Status::OK(); + } + SnapshotCoordinatorContext& context_; std::mutex mutex_; - std::unordered_map, - TxnSnapshotIdHash> snapshots_ GUARDED_BY(mutex_); - std::unordered_map, - TxnSnapshotRestorationIdHash> restorations_ GUARDED_BY(mutex_); - std::unordered_map, - SnapshotScheduleIdHash> schedules_ GUARDED_BY(mutex_); + class ScheduleTag; + using Snapshots = boost::multi_index_container< + std::unique_ptr, + boost::multi_index::indexed_by< + // Access snapshots by id. + boost::multi_index::hashed_unique< + boost::multi_index::const_mem_fun< + SnapshotState, const TxnSnapshotId&, &SnapshotState::id> + >, + // Group snapshots by schedule id. Ordered by hybrid time for the same schedule. + boost::multi_index::ordered_non_unique< + boost::multi_index::tag, + boost::multi_index::composite_key< + SnapshotState, + boost::multi_index::const_mem_fun< + SnapshotState, const SnapshotScheduleId&, &SnapshotState::schedule_id>, + boost::multi_index::const_mem_fun< + SnapshotState, HybridTime, &SnapshotState::snapshot_hybrid_time> + > + > + > + >; + // For restorations and schedules we have to use multi_index since there are template + // functions that expect same interface for those collections. + using Restorations = boost::multi_index_container< + std::unique_ptr, + boost::multi_index::indexed_by< + boost::multi_index::hashed_unique< + boost::multi_index::const_mem_fun< + RestorationState, const TxnSnapshotRestorationId&, + &RestorationState::restoration_id> + > + > + >; + using Schedules = boost::multi_index_container< + std::unique_ptr, + boost::multi_index::indexed_by< + boost::multi_index::hashed_unique< + boost::multi_index::const_mem_fun< + SnapshotScheduleState, const SnapshotScheduleId&, &SnapshotScheduleState::id> + > + > + >; + + Snapshots snapshots_ GUARDED_BY(mutex_); + Restorations restorations_ GUARDED_BY(mutex_); + Schedules schedules_ GUARDED_BY(mutex_); rpc::Poller poller_; }; @@ -580,9 +721,8 @@ MasterSnapshotCoordinator::MasterSnapshotCoordinator(SnapshotCoordinatorContext* MasterSnapshotCoordinator::~MasterSnapshotCoordinator() {} Result MasterSnapshotCoordinator::Create( - const SysRowEntries& entries, bool imported, HybridTime snapshot_hybrid_time, - CoarseTimePoint deadline) { - return impl_->Create(entries, imported, snapshot_hybrid_time, deadline); + const SysRowEntries& entries, bool imported, CoarseTimePoint deadline) { + return impl_->Create(entries, imported, deadline); } Status MasterSnapshotCoordinator::CreateReplicated( diff --git a/src/yb/master/master_snapshot_coordinator.h b/src/yb/master/master_snapshot_coordinator.h index 281f183ffbfe..7a5aad6c7f0d 100644 --- a/src/yb/master/master_snapshot_coordinator.h +++ b/src/yb/master/master_snapshot_coordinator.h @@ -40,8 +40,7 @@ class MasterSnapshotCoordinator : public tablet::SnapshotCoordinator { ~MasterSnapshotCoordinator(); Result Create( - const SysRowEntries& entries, bool imported, HybridTime snapshot_hybrid_time, - CoarseTimePoint deadline); + const SysRowEntries& entries, bool imported, CoarseTimePoint deadline); CHECKED_STATUS Delete(const TxnSnapshotId& snapshot_id, CoarseTimePoint deadline); diff --git a/src/yb/master/snapshot_coordinator_context.h b/src/yb/master/snapshot_coordinator_context.h index 1e013fe8e594..82db7d37739c 100644 --- a/src/yb/master/snapshot_coordinator_context.h +++ b/src/yb/master/snapshot_coordinator_context.h @@ -24,6 +24,8 @@ #include "yb/rpc/rpc_fwd.h" +#include "yb/server/server_fwd.h" + #include "yb/tablet/tablet_fwd.h" #include "yb/tserver/tserver_fwd.h" @@ -56,6 +58,11 @@ class SnapshotCoordinatorContext { const scoped_refptr& tablet, const std::string& snapshot_id, TabletSnapshotOperationCallback callback) = 0; + virtual Result CollectEntries( + const google::protobuf::RepeatedPtrField& tables, + bool add_indexes, + bool include_parent_colocated_table) = 0; + virtual const Schema& schema() = 0; virtual void Submit(std::unique_ptr operation) = 0; @@ -64,6 +71,8 @@ class SnapshotCoordinatorContext { virtual bool IsLeader() = 0; + virtual server::Clock* Clock() = 0; + virtual ~SnapshotCoordinatorContext() = default; }; diff --git a/src/yb/master/snapshot_schedule_state.cc b/src/yb/master/snapshot_schedule_state.cc index 3a5ae79b75fa..e6f9927ff0e2 100644 --- a/src/yb/master/snapshot_schedule_state.cc +++ b/src/yb/master/snapshot_schedule_state.cc @@ -46,7 +46,7 @@ Status SnapshotScheduleState::StoreToWriteBatch(docdb::KeyValueWriteBatchPB* out return Status::OK(); } -Status SnapshotScheduleState::ToPB(SnapshotScheduleInfoPB* pb) { +Status SnapshotScheduleState::ToPB(SnapshotScheduleInfoPB* pb) const { pb->set_id(id_.data(), id_.size()); *pb->mutable_options() = options_; return Status::OK(); @@ -56,5 +56,27 @@ std::string SnapshotScheduleState::ToString() const { return YB_CLASS_TO_STRING(id, options); } +void SnapshotScheduleState::PrepareOperations( + HybridTime last_snapshot_time, HybridTime now, SnapshotScheduleOperations* operations) { + if (creating_snapshot_id_ || + (last_snapshot_time && last_snapshot_time.AddSeconds(options_.interval_sec()) > now)) { + return; + } + creating_snapshot_id_ = TxnSnapshotId::GenerateRandom(); + operations->push_back(SnapshotScheduleOperation { + .schedule_id = id_, + .filter = options_.filter(), + .snapshot_id = creating_snapshot_id_, + }); +} + +void SnapshotScheduleState::SnapshotFinished( + const TxnSnapshotId& snapshot_id, const Status& status) { + if (creating_snapshot_id_ != snapshot_id) { + return; + } + creating_snapshot_id_ = TxnSnapshotId::Nil(); +} + } // namespace master } // namespace yb diff --git a/src/yb/master/snapshot_schedule_state.h b/src/yb/master/snapshot_schedule_state.h index 9fb4ec297c1a..ae2a19435f5b 100644 --- a/src/yb/master/snapshot_schedule_state.h +++ b/src/yb/master/snapshot_schedule_state.h @@ -14,6 +14,7 @@ #ifndef YB_MASTER_SNAPSHOT_SCHEDULE_STATE_H #define YB_MASTER_SNAPSHOT_SCHEDULE_STATE_H +#include "yb/common/hybrid_time.h" #include "yb/common/snapshot.h" #include "yb/docdb/docdb_fwd.h" @@ -24,6 +25,14 @@ namespace yb { namespace master { +struct SnapshotScheduleOperation { + SnapshotScheduleId schedule_id; + SnapshotScheduleFilterPB filter; + TxnSnapshotId snapshot_id; +}; + +using SnapshotScheduleOperations = std::vector; + class SnapshotScheduleState { public: SnapshotScheduleState( @@ -41,14 +50,22 @@ class SnapshotScheduleState { return true; } + void PrepareOperations( + HybridTime last_snapshot_time, HybridTime now, SnapshotScheduleOperations* operations); + void SnapshotFinished(const TxnSnapshotId& snapshot_id, const Status& status); + CHECKED_STATUS StoreToWriteBatch(docdb::KeyValueWriteBatchPB* write_batch); - CHECKED_STATUS ToPB(SnapshotScheduleInfoPB* pb); + CHECKED_STATUS ToPB(SnapshotScheduleInfoPB* pb) const; std::string ToString() const; private: SnapshotCoordinatorContext& context_; SnapshotScheduleId id_; SnapshotScheduleOptionsPB options_; + + // When snapshot is being created for this schedule, this field contains id of this snapshot. + // To prevent creating other snapshots during that time. + TxnSnapshotId creating_snapshot_id_ = TxnSnapshotId::Nil(); }; } // namespace master diff --git a/src/yb/master/snapshot_state.cc b/src/yb/master/snapshot_state.cc index bef1ba28d56e..56ca0358b19d 100644 --- a/src/yb/master/snapshot_state.cc +++ b/src/yb/master/snapshot_state.cc @@ -42,7 +42,8 @@ SnapshotState::SnapshotState( SnapshotCoordinatorContext* context, const TxnSnapshotId& id, const tserver::TabletSnapshotOpRequestPB& request) : StateWithTablets(context, SysSnapshotEntryPB::CREATING), - id_(id), snapshot_hybrid_time_(request.snapshot_hybrid_time()), version_(1) { + id_(id), snapshot_hybrid_time_(request.snapshot_hybrid_time()), + schedule_id_(TryFullyDecodeSnapshotScheduleId(request.schedule_id())), version_(1) { InitTabletIds(request.tablet_id(), request.imported() ? SysSnapshotEntryPB::COMPLETE : SysSnapshotEntryPB::CREATING); request.extra_data().UnpackTo(&entries_); @@ -52,14 +53,18 @@ SnapshotState::SnapshotState( SnapshotCoordinatorContext* context, const TxnSnapshotId& id, const SysSnapshotEntryPB& entry) : StateWithTablets(context, entry.state()), - id_(id), snapshot_hybrid_time_(entry.snapshot_hybrid_time()), version_(entry.version()) { + id_(id), snapshot_hybrid_time_(entry.snapshot_hybrid_time()), + schedule_id_(TryFullyDecodeSnapshotScheduleId(entry.schedule_id())), + version_(entry.version()) { InitTablets(entry.tablet_snapshots()); *entries_.mutable_entries() = entry.entries(); } std::string SnapshotState::ToString() const { - return Format("{ id: $0 snapshot_hybrid_time: $1 version: $2 initial_state: $3 tablets: $4 }", - id_, snapshot_hybrid_time_, version_, InitialStateName(), tablets()); + return Format( + "{ id: $0 snapshot_hybrid_time: $1 schedule_id: $2 version: $3 initial_state: $4 " + "tablets: $5 }", + id_, snapshot_hybrid_time_, schedule_id_, version_, InitialStateName(), tablets()); } Status SnapshotState::ToPB(SnapshotInfoPB* out) { @@ -75,6 +80,10 @@ Status SnapshotState::ToEntryPB(SysSnapshotEntryPB* out, ForClient for_client) { *out->mutable_entries() = entries_.entries(); + if (schedule_id_) { + out->set_schedule_id(schedule_id_.data(), schedule_id_.size()); + } + out->set_version(version_); return Status::OK(); diff --git a/src/yb/master/snapshot_state.h b/src/yb/master/snapshot_state.h index 880d7d499ee2..4bdd533410b3 100644 --- a/src/yb/master/snapshot_state.h +++ b/src/yb/master/snapshot_state.h @@ -59,6 +59,10 @@ class SnapshotState : public StateWithTablets { return snapshot_hybrid_time_; } + const SnapshotScheduleId& schedule_id() const { + return schedule_id_; + } + int version() const { return version_; } @@ -79,6 +83,9 @@ class SnapshotState : public StateWithTablets { TxnSnapshotId id_; HybridTime snapshot_hybrid_time_; SysRowEntries entries_; + // When snapshot is taken as a part of snapshot schedule schedule_id_ will contain this + // schedule id. Otherwise it will be nil. + SnapshotScheduleId schedule_id_; int version_; }; diff --git a/src/yb/master/state_with_tablets.cc b/src/yb/master/state_with_tablets.cc index a747a2ddbdbf..f2069efa727a 100644 --- a/src/yb/master/state_with_tablets.cc +++ b/src/yb/master/state_with_tablets.cc @@ -56,7 +56,7 @@ void StateWithTablets::InitTablets( CheckCompleteness(); } -Result StateWithTablets::AggregatedState() { +Result StateWithTablets::AggregatedState() const { SysSnapshotEntryPB::State result = initial_state_; bool has_initial = false; for (const auto& tablet : tablets_) { @@ -76,10 +76,19 @@ Result StateWithTablets::AggregatedState() { return has_initial ? initial_state_ : result; } -Result StateWithTablets::Complete() { +Result StateWithTablets::Complete() const { return VERIFY_RESULT(AggregatedState()) != initial_state_; } +Status StateWithTablets::AnyFailure() const { + for (const auto& tablet : tablets_) { + if (tablet.state == SysSnapshotEntryPB::FAILED) { + return tablet.last_error; + } + } + return Status::OK(); +} + bool StateWithTablets::AllTabletsDone() const { return num_tablets_in_initial_state_ == 0; } diff --git a/src/yb/master/state_with_tablets.h b/src/yb/master/state_with_tablets.h index b7fbb6dae624..ad41728ec602 100644 --- a/src/yb/master/state_with_tablets.h +++ b/src/yb/master/state_with_tablets.h @@ -51,9 +51,10 @@ class StateWithTablets { // If any of tablets failed returns this failure. // Otherwise if any of tablets is in initial state returns initial state. // Otherwise all tablets should be in the same state, which is returned. - Result AggregatedState(); + Result AggregatedState() const; - Result Complete(); + CHECKED_STATUS AnyFailure() const; + Result Complete() const; bool AllTabletsDone() const; bool PassedSinceCompletion(const MonoDelta& duration) const; std::vector TabletIdsInState(SysSnapshotEntryPB::State state); diff --git a/src/yb/tablet/operations/operation.h b/src/yb/tablet/operations/operation.h index 8276e86b9bba..21eec61e76fb 100644 --- a/src/yb/tablet/operations/operation.h +++ b/src/yb/tablet/operations/operation.h @@ -449,6 +449,25 @@ class WeakSynchronizerOperationCompletionCallback : public OperationCompletionCa std::weak_ptr synchronizer_; }; +template +class FunctorOperationCompletionCallback : public OperationCompletionCallback { + public: + explicit FunctorOperationCompletionCallback(Functor&& functor) + : functor_(std::move(functor)) {} + + void OperationCompleted() override { + functor_(status()); + } + + private: + Functor functor_; +}; + +template +auto MakeFunctorOperationCompletionCallback(Functor&& functor) { + return std::make_unique>(std::move(functor)); +} + } // namespace tablet } // namespace yb diff --git a/src/yb/tserver/backup.proto b/src/yb/tserver/backup.proto index c85e1bacd256..40164453742d 100644 --- a/src/yb/tserver/backup.proto +++ b/src/yb/tserver/backup.proto @@ -58,6 +58,8 @@ message TabletSnapshotOpRequestPB { google.protobuf.Any extra_data = 8; bool imported = 9; + + bytes schedule_id = 10; } message TabletSnapshotOpResponsePB {