Skip to content

Commit

Permalink
[yugabyte#7126] PITR: Correct history retention for newly added tablets
Browse files Browse the repository at this point in the history
Summary: This diff adds logic to propagate correct history retention to newly created tablets and particiate in existing snapshot schedule.

Test Plan: ybd --gtest_filter SnapshotScheduleTest.Index

Reviewers: bogdan, amitanand

Reviewed By: amitanand

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D10968
  • Loading branch information
spolitov authored and YintongMa committed May 26, 2021
1 parent 028e50f commit ea600e5
Show file tree
Hide file tree
Showing 27 changed files with 355 additions and 356 deletions.
5 changes: 4 additions & 1 deletion ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
Result<SysRowEntries> CollectEntries(
const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables,
bool add_indexes,
bool include_parent_colocated_table) override;
bool include_parent_colocated_table,
bool succeed_if_create_in_progress) override;

server::Clock* Clock() override;

Expand All @@ -269,6 +270,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

bool IsLeader() override;

Result<SnapshotSchedulesToTabletsMap> MakeSnapshotSchedulesToTabletsMap() override;

static void SetTabletSnapshotsState(SysSnapshotEntryPB::State state,
SysSnapshotEntryPB* snapshot_pb);

Expand Down
12 changes: 9 additions & 3 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,12 @@ void CatalogManager::Submit(std::unique_ptr<tablet::Operation> operation) {
Result<SysRowEntries> CatalogManager::CollectEntries(
const google::protobuf::RepeatedPtrField<TableIdentifierPB>& table_identifiers,
bool add_indexes,
bool include_parent_colocated_table) {
bool include_parent_colocated_table,
bool succeed_if_create_in_progress) {
SysRowEntries entries;
auto tables = VERIFY_RESULT(CollectTables(
table_identifiers, add_indexes, include_parent_colocated_table));
table_identifiers, add_indexes, include_parent_colocated_table,
succeed_if_create_in_progress));
for (const auto& table : tables) {
// TODO(txn_snapshot) use single lock to resolve all tables to tablets
SnapshotInfo::AddEntries(table, entries.mutable_entries(), /* tablet_infos= */ nullptr);
Expand All @@ -402,7 +404,7 @@ server::Clock* CatalogManager::Clock() {
Status CatalogManager::CreateTransactionAwareSnapshot(
const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) {
SysRowEntries entries = VERIFY_RESULT(CollectEntries(
req.tables(), req.add_indexes(), true /* include_parent_colocated_table */));
req.tables(), req.add_indexes(), true /* include_parent_colocated_table */, false));

auto snapshot_id = VERIFY_RESULT(snapshot_coordinator_.Create(
entries, req.imported(), rpc->GetClientDeadline()));
Expand Down Expand Up @@ -3333,6 +3335,10 @@ void CatalogManager::Started() {
snapshot_coordinator_.Start();
}

Result<SnapshotSchedulesToTabletsMap> CatalogManager::MakeSnapshotSchedulesToTabletsMap() {
return snapshot_coordinator_.MakeSnapshotSchedulesToTabletsMap();
}

} // namespace enterprise
} // namespace master
} // namespace yb
54 changes: 54 additions & 0 deletions src/yb/client/snapshot-schedule-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@

#include "yb/client/snapshot_test_base.h"

#include "yb/client/session.h"

#include "yb/master/master.h"
#include "yb/master/master_backup.proxy.h"

#include "yb/tablet/tablet_peer.h"
#include "yb/tablet/tablet_retention_policy.h"

#include "yb/yql/cql/ql/util/statement_result.h"

using namespace std::literals;

DECLARE_bool(enable_history_cutoff_propagation);
Expand Down Expand Up @@ -183,6 +188,55 @@ TEST_F(SnapshotScheduleTest, GC) {
}
}

TEST_F(SnapshotScheduleTest, Index) {
FLAGS_timestamp_history_retention_interval_sec = kTimeMultiplier;

auto schedule_id = ASSERT_RESULT(CreateSchedule());
ASSERT_OK(WaitScheduleSnapshot(schedule_id));

CreateIndex(Transactional::kTrue, 1, false);
auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now();
constexpr int kTransaction = 0;
constexpr auto op_type = WriteOpType::INSERT;

auto session = CreateSession();
for (size_t r = 0; r != kNumRows; ++r) {
ASSERT_OK(kv_table_test::WriteRow(
&index_, session, KeyForTransactionAndIndex(kTransaction, r),
ValueForTransactionAndIndex(kTransaction, r, op_type), op_type));
}

LOG(INFO) << "Index columns: " << AsString(index_.AllColumnNames());
for (size_t r = 0; r != kNumRows; ++r) {
const auto key = KeyForTransactionAndIndex(kTransaction, r);
const auto fetched = ASSERT_RESULT(kv_table_test::SelectRow(
&index_, session, key, kValueColumn));
ASSERT_EQ(key, fetched);
}

auto peers = ListTabletPeers(cluster_.get(), [index_id = index_->id()](const auto& peer) {
return peer->tablet_metadata()->table_id() == index_id;
});

ASSERT_OK(WaitFor([this, peers, hybrid_time]() -> Result<bool> {
auto snapshots = VERIFY_RESULT(ListSnapshots());
if (snapshots.size() == 2) {
return true;
}

for (const auto& peer : peers) {
SCOPED_TRACE(Format(
"T $0 P $1 Table $2", peer->tablet_id(), peer->permanent_uuid(),
peer->tablet_metadata()->table_name()));
auto tablet = peer->tablet();
auto history_cutoff = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff;
SCHECK_LE(history_cutoff, hybrid_time, IllegalState, "Too big history cutoff");
}

return false;
}, kSnapshotInterval, "Second snapshot"));
}

TEST_F(SnapshotScheduleTest, Restart) {
ASSERT_NO_FATALS(WriteData());
auto schedule_id = ASSERT_RESULT(CreateSchedule());
Expand Down
7 changes: 7 additions & 0 deletions src/yb/common/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#ifndef YB_COMMON_SNAPSHOT_H
#define YB_COMMON_SNAPSHOT_H

#include <unordered_map>

#include "yb/common/entity_ids.h"

#include "yb/util/strongly_typed_uuid.h"

namespace yb {
Expand All @@ -22,6 +26,9 @@ YB_STRONGLY_TYPED_UUID(TxnSnapshotId);
YB_STRONGLY_TYPED_UUID(TxnSnapshotRestorationId);
YB_STRONGLY_TYPED_UUID(SnapshotScheduleId);

using SnapshotSchedulesToTabletsMap =
std::unordered_map<SnapshotScheduleId, std::vector<TabletId>, SnapshotScheduleIdHash>;

} // namespace yb

#endif // YB_COMMON_SNAPSHOT_H
8 changes: 7 additions & 1 deletion src/yb/master/async_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ TabletServerId AsyncTabletLeaderTask::permanent_uuid() const {
AsyncCreateReplica::AsyncCreateReplica(Master *master,
ThreadPool *callback_pool,
const string& permanent_uuid,
const scoped_refptr<TabletInfo>& tablet)
const scoped_refptr<TabletInfo>& tablet,
const std::vector<SnapshotScheduleId>& snapshot_schedules)
: RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, tablet->table().get()),
tablet_id_(tablet->tablet_id()) {
deadline_ = start_ts_;
Expand All @@ -503,6 +504,11 @@ AsyncCreateReplica::AsyncCreateReplica(Master *master,
if (table_lock->data().pb.has_index_info()) {
req_.mutable_index_info()->CopyFrom(table_lock->data().pb.index_info());
}
auto& req_schedules = *req_.mutable_snapshot_schedules();
req_schedules.Reserve(snapshot_schedules.size());
for (const auto& id : snapshot_schedules) {
req_schedules.Add()->assign(id.AsSlice().cdata(), id.size());
}
}

void AsyncCreateReplica::HandleResponse(int attempt) {
Expand Down
3 changes: 2 additions & 1 deletion src/yb/master/async_rpc_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ class AsyncCreateReplica : public RetrySpecificTSRpcTask {
AsyncCreateReplica(Master *master,
ThreadPool *callback_pool,
const std::string& permanent_uuid,
const scoped_refptr<TabletInfo>& tablet);
const scoped_refptr<TabletInfo>& tablet,
const std::vector<SnapshotScheduleId>& snapshot_schedules);

Type type() const override { return ASYNC_CREATE_REPLICA; }

Expand Down
34 changes: 23 additions & 11 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3553,7 +3553,8 @@ Status CatalogManager::FindNamespace(const NamespaceIdentifierPB& ns_identifier,
return FindNamespaceUnlocked(ns_identifier, ns_info);
}

Result<TableDescription> CatalogManager::DescribeTable(const TableIdentifierPB& table_identifier) {
Result<TableDescription> CatalogManager::DescribeTable(
const TableIdentifierPB& table_identifier, bool succeed_if_create_in_progress) {
TableDescription result;

// Lookup the table and verify it exists.
Expand All @@ -3569,7 +3570,7 @@ Result<TableDescription> CatalogManager::DescribeTable(const TableIdentifierPB&
TRACE("Locking table");
auto l = result.table_info->LockForRead();

if (result.table_info->IsCreateInProgress()) {
if (!succeed_if_create_in_progress && result.table_info->IsCreateInProgress()) {
return STATUS(IllegalState, "Table creation is in progress", result.table_info->ToString(),
MasterError(MasterErrorPB::TABLE_CREATION_IS_IN_PROGRESS));
}
Expand Down Expand Up @@ -7842,8 +7843,7 @@ Status CatalogManager::ProcessPendingAssignments(const TabletInfos& tablets) {
}
}
// Send the CreateTablet() requests to the servers. This is asynchronous / non-blocking.
SendCreateTabletRequests(deferred.needs_create_rpc);
return Status::OK();
return SendCreateTabletRequests(deferred.needs_create_rpc);
}

Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_descs,
Expand Down Expand Up @@ -7978,18 +7978,27 @@ Status CatalogManager::HandlePlacementUsingPlacementInfo(const PlacementInfoPB&
return Status::OK();
}

void CatalogManager::SendCreateTabletRequests(const vector<TabletInfo*>& tablets) {
Status CatalogManager::SendCreateTabletRequests(const vector<TabletInfo*>& tablets) {
auto schedules_to_tablets_map = VERIFY_RESULT(MakeSnapshotSchedulesToTabletsMap());
for (TabletInfo *tablet : tablets) {
const consensus::RaftConfigPB& config =
tablet->metadata().dirty().pb.committed_consensus_state().config();
tablet->set_last_update_time(MonoTime::Now());
std::vector<SnapshotScheduleId> schedules;
for (const auto& pair : schedules_to_tablets_map) {
if (std::binary_search(pair.second.begin(), pair.second.end(), tablet->id())) {
schedules.push_back(pair.first);
}
}
for (const RaftPeerPB& peer : config.peers()) {
auto task = std::make_shared<AsyncCreateReplica>(master_, AsyncTaskPool(),
peer.permanent_uuid(), tablet);
peer.permanent_uuid(), tablet, schedules);
tablet->table()->AddTask(task);
WARN_NOT_OK(ScheduleTask(task), "Failed to send new tablet request");
}
}

return Status::OK();
}

// If responses have been received from sufficient replicas (including hinted leader),
Expand Down Expand Up @@ -9003,12 +9012,14 @@ Status CatalogManager::ScheduleTask(std::shared_ptr<RetryingTSRpcTask> task) {
Result<vector<TableDescription>> CatalogManager::CollectTables(
const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables,
bool add_indexes,
bool include_parent_colocated_table) {
bool include_parent_colocated_table,
bool succeed_if_create_in_progress) {
vector<TableDescription> all_tables;
unordered_set<NamespaceId> parent_colocated_table_ids;

for (const auto& table_id_pb : tables) {
TableDescription table_description = VERIFY_RESULT(DescribeTable(table_id_pb));
auto table_description = VERIFY_RESULT(DescribeTable(
table_id_pb, succeed_if_create_in_progress));
if (include_parent_colocated_table && table_description.table_info->colocated()) {
// If a table is colocated, add its parent colocated table as well.
const auto parent_table_id =
Expand All @@ -9019,8 +9030,8 @@ Result<vector<TableDescription>> CatalogManager::CollectTables(
TableIdentifierPB parent_table_pb;
parent_table_pb.set_table_id(parent_table_id);
parent_table_pb.mutable_namespace_()->set_id(table_description.namespace_info->id());
TableDescription parent_table_description = VERIFY_RESULT(DescribeTable(parent_table_pb));
all_tables.push_back(parent_table_description);
all_tables.push_back(VERIFY_RESULT(DescribeTable(
parent_table_pb, succeed_if_create_in_progress)));
}
}
all_tables.push_back(table_description);
Expand All @@ -9047,7 +9058,8 @@ Result<vector<TableDescription>> CatalogManager::CollectTables(
TableIdentifierPB index_id_pb;
index_id_pb.set_table_id(index_info.table_id());
index_id_pb.mutable_namespace_()->set_id(table_description.namespace_info->id());
all_tables.push_back(VERIFY_RESULT(DescribeTable(index_id_pb)));
all_tables.push_back(VERIFY_RESULT(DescribeTable(
index_id_pb, succeed_if_create_in_progress)));
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
CHECKED_STATUS FindTable(const TableIdentifierPB& table_identifier,
scoped_refptr<TableInfo>* table_info);

Result<TableDescription> DescribeTable(const TableIdentifierPB& table_identifier);
Result<TableDescription> DescribeTable(
const TableIdentifierPB& table_identifier, bool succeed_if_create_in_progress);

void AssertLeaderLockAcquiredForReading() const {
leader_lock_.AssertAcquiredForReading();
Expand Down Expand Up @@ -742,7 +743,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
Result<std::vector<TableDescription>> CollectTables(
const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables,
bool add_indexes,
bool include_parent_colocated_table = false);
bool include_parent_colocated_table = false,
bool succeed_if_create_in_progress = false);

// Returns 'table_replication_info' itself if set. Else looks up placement info for its
// 'tablespace_id'. If neither is set, returns the cluster level replication info.
Expand Down Expand Up @@ -1023,7 +1025,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
//
// This must be called after persisting the tablet state as
// CREATING to ensure coherent state after Master failover.
void SendCreateTabletRequests(const std::vector<TabletInfo*>& tablets);
CHECKED_STATUS SendCreateTabletRequests(const std::vector<TabletInfo*>& tablets);

// Send the "alter table request" to all tablets of the specified table.
//
Expand Down Expand Up @@ -1233,6 +1235,10 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
// the cluster config affinity specification.
CHECKED_STATUS SysCatalogRespectLeaderAffinity();

virtual Result<SnapshotSchedulesToTabletsMap> MakeSnapshotSchedulesToTabletsMap() {
return SnapshotSchedulesToTabletsMap();
}

// ----------------------------------------------------------------------------------------------
// Private member fields
// ----------------------------------------------------------------------------------------------
Expand Down
34 changes: 32 additions & 2 deletions src/yb/master/master_snapshot_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,28 @@ class MasterSnapshotCoordinator::Impl {
return Status::OK();
}

Result<SnapshotSchedulesToTabletsMap> MakeSnapshotSchedulesToTabletsMap() {
std::vector<std::pair<SnapshotScheduleId, SnapshotScheduleFilterPB>> schedules;
{
std::lock_guard<std::mutex> lock(mutex_);
for (const auto& schedule : schedules_) {
schedules.emplace_back(schedule->id(), schedule->options().filter());
}
}
SnapshotSchedulesToTabletsMap result;
for (const auto& id_and_filter : schedules) {
auto entries = VERIFY_RESULT(CollectEntries(id_and_filter.second));
auto& tablets = result[id_and_filter.first];
for (const auto& entry : entries.entries()) {
if (entry.type() == SysRowEntry::TABLET) {
tablets.push_back(entry.id());
}
}
std::sort(tablets.begin(), tablets.end());
}
return result;
}

void Start() {
poller_.Start(&context_.Scheduler(), FLAGS_snapshot_coordinator_poll_interval_ms * 1ms);
}
Expand Down Expand Up @@ -601,8 +623,7 @@ class MasterSnapshotCoordinator::Impl {
}

CHECKED_STATUS ExecuteScheduleOperation(const SnapshotScheduleOperation& operation) {
auto entries = VERIFY_RESULT(context_.CollectEntries(
operation.filter.tables().tables(), true, true));
auto entries = VERIFY_RESULT(CollectEntries(operation.filter));
RETURN_NOT_OK(SubmitCreate(
entries, false, operation.schedule_id, operation.snapshot_id,
tablet::MakeFunctorOperationCompletionCallback(
Expand Down Expand Up @@ -757,6 +778,10 @@ class MasterSnapshotCoordinator::Impl {
return Status::OK();
}

Result<SysRowEntries> CollectEntries(const SnapshotScheduleFilterPB& filter) {
return context_.CollectEntries(filter.tables().tables(), true, true, true);
}

SnapshotCoordinatorContext& context_;
std::mutex mutex_;
class ScheduleTag;
Expand Down Expand Up @@ -880,5 +905,10 @@ Status MasterSnapshotCoordinator::FillHeartbeatResponse(TSHeartbeatResponsePB* r
return impl_->FillHeartbeatResponse(resp);
}

Result<SnapshotSchedulesToTabletsMap>
MasterSnapshotCoordinator::MakeSnapshotSchedulesToTabletsMap() {
return impl_->MakeSnapshotSchedulesToTabletsMap();
}

} // namespace master
} // namespace yb
3 changes: 3 additions & 0 deletions src/yb/master/master_snapshot_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class MasterSnapshotCoordinator : public tablet::SnapshotCoordinator {

CHECKED_STATUS FillHeartbeatResponse(TSHeartbeatResponsePB* resp);

// For each returns map from schedule id to sorted vectors of tablets id in this schedule.
Result<SnapshotSchedulesToTabletsMap> MakeSnapshotSchedulesToTabletsMap();

void Start();

void Shutdown();
Expand Down
3 changes: 2 additions & 1 deletion src/yb/master/snapshot_coordinator_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class SnapshotCoordinatorContext {
virtual Result<SysRowEntries> CollectEntries(
const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables,
bool add_indexes,
bool include_parent_colocated_table) = 0;
bool include_parent_colocated_table,
bool succeed_if_create_in_progress) = 0;

virtual const Schema& schema() = 0;

Expand Down
Loading

0 comments on commit ea600e5

Please sign in to comment.