Skip to content

Commit

Permalink
hc hive sync status - merge stable-24-1 (#4841)
Browse files Browse the repository at this point in the history
  • Loading branch information
StekPerepolnen authored May 27, 2024
1 parent f6cc07b commit a862c28
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 22 deletions.
3 changes: 1 addition & 2 deletions ydb/core/health_check/health_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,7 @@ class TSelfCheckRequest : public TActorBootstrapped<TSelfCheckRequest> {
static const int HIVE_SYNCHRONIZATION_PERIOD_MS = 10000;

bool IsHiveSynchronizationPeriod(NKikimrHive::TEvResponseHiveInfo& hiveInfo) {
auto hiveUptime = hiveInfo.GetStartTimeTimestamp() - hiveInfo.GetResponseTimestamp();
return hiveUptime > HIVE_SYNCHRONIZATION_PERIOD_MS;
return hiveInfo.GetResponseTimestamp() < hiveInfo.GetStartTimeTimestamp() + HIVE_SYNCHRONIZATION_PERIOD_MS;
}

void AggregateHiveInfo() {
Expand Down
140 changes: 120 additions & 20 deletions ydb/core/health_check/health_check_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
int const GROUP_START_ID = 1200;
int const VCARD_START_ID = 55;

const TPathId SUBDOMAIN_KEY = {7000000000, 1};

void ChangeDescribeSchemeResult(TEvSchemeShard::TEvDescribeSchemeResult::TPtr* ev, ui64 size = 20000000, ui64 quota = 90000000) {
auto record = (*ev)->Get()->MutableRecord();
auto pool = record->mutable_pathdescription()->mutable_domaindescription()->add_storagepools();
Expand Down Expand Up @@ -146,7 +148,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}

auto groupId = GROUP_START_ID;

auto group = pbConfig->add_group();
group->CopyFrom(groupSample);
group->set_groupid(groupId);
Expand All @@ -155,7 +157,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {

group->clear_vslotid();
auto vslotId = VCARD_START_ID;

for (auto status: vdiskStatuses) {
auto vslot = pbConfig->add_vslot();
vslot->CopyFrom(vslotSample);
Expand Down Expand Up @@ -192,7 +194,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}

auto groupId = GROUP_START_ID;

auto group = pbConfig->add_group();
group->CopyFrom(groupSample);
group->set_groupid(groupId);
Expand All @@ -201,7 +203,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {

group->clear_vslotid();
auto vslotId = VCARD_START_ID;

for (auto status: vdiskStatuses) {
auto vslot = pbConfig->add_vslot();
vslot->CopyFrom(vslotSample);
Expand Down Expand Up @@ -547,7 +549,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
auto result = RequestHc(1, 100, false, true);
CheckHcProtobufSizeIssue(result, Ydb::Monitoring::StatusFlag::RED, 1);
}

void ClearLoadAverage(TEvWhiteboard::TEvSystemStateResponse::TPtr* ev) {
auto *systemStateInfo = (*ev)->Get()->Record.MutableSystemStateInfo();
for (NKikimrWhiteboard::TSystemStateInfo &state : *systemStateInfo) {
Expand Down Expand Up @@ -618,7 +620,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
(*ev)->Get()->Record.GetResponse().operation().result().UnpackTo(&listTenantsResult);
for (const auto &path : paths) {
listTenantsResult.Addpaths(path);
}
}
(*ev)->Get()->Record.MutableResponse()->mutable_operation()->mutable_result()->PackFrom(listTenantsResult);
}

Expand All @@ -632,13 +634,13 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
sharedNodeStats->MutableNodeDomain()->SetSchemeShard(SHARED_DOMAIN_KEY.OwnerId);
sharedNodeStats->MutableNodeDomain()->SetPathId(SHARED_DOMAIN_KEY.LocalPathId);
}

if (exclusiveDynNodeId) {
auto *exclusiveNodeStats = record.MutableNodeStats()->Add();
exclusiveNodeStats->SetNodeId(exclusiveDynNodeId);
exclusiveNodeStats->MutableNodeDomain()->SetSchemeShard(SERVERLESS_DOMAIN_KEY.OwnerId);
exclusiveNodeStats->MutableNodeDomain()->SetPathId(SERVERLESS_DOMAIN_KEY.LocalPathId);
}
}
}

Y_UNIT_TEST(SpecificServerless) {
Expand Down Expand Up @@ -671,7 +673,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeShared, runtime);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeShared, runtime);
break;
}
case TEvHive::EvResponseHiveNodeStats: {
Expand Down Expand Up @@ -762,7 +764,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeExclusive, runtime);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeExclusive, runtime);
break;
}
case TEvHive::EvResponseHiveNodeStats: {
Expand Down Expand Up @@ -864,7 +866,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeShared, runtime);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeShared, runtime);
break;
}
case TEvHive::EvResponseHiveNodeStats: {
Expand Down Expand Up @@ -955,7 +957,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeExclusive, runtime);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeExclusive, runtime);
break;
}
case TEvHive::EvResponseHiveNodeStats: {
Expand Down Expand Up @@ -1000,7 +1002,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {

Ctest << result.ShortDebugString();
UNIT_ASSERT_VALUES_EQUAL(result.self_check_result(), Ydb::Monitoring::SelfCheck::GOOD);

bool databaseFoundInResult = false;
for (const auto &database_status : result.database_status()) {
if (database_status.name() == "/Root/serverless") {
Expand All @@ -1018,7 +1020,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}
UNIT_ASSERT(databaseFoundInResult);
}

Y_UNIT_TEST(ServerlessWhenTroublesWithSharedNodes) {
TPortManager tp;
ui16 port = tp.GetPort(2134);
Expand All @@ -1044,7 +1046,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeShared, runtime);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeShared, runtime);
break;
}
case TEvSchemeShard::EvDescribeSchemeResult: {
Expand Down Expand Up @@ -1073,7 +1075,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
return TTestActorRuntime::EEventAction::PROCESS;
};
runtime.SetObserverFunc(observerFunc);

TActorId sender = runtime.AllocateEdgeActor();
TAutoPtr<IEventHandle> handle;

Expand All @@ -1098,7 +1100,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
UNIT_ASSERT_VALUES_EQUAL(database_status.storage().pools().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(database_status.storage().pools()[0].id(), SHARED_STORAGE_POOL_NAME);
}

Y_UNIT_TEST(ServerlessWithExclusiveNodesWhenTroublesWithSharedNodes) {
TPortManager tp;
ui16 port = tp.GetPort(2134);
Expand Down Expand Up @@ -1141,7 +1143,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeExclusive, runtime);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeExclusive, runtime);
break;
}
case TEvHive::EvResponseHiveNodeStats: {
Expand Down Expand Up @@ -1277,7 +1279,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeExclusive, runtime);
ChangeNavigateKeyResultServerless(x, NKikimrSubDomains::EServerlessComputeResourcesModeExclusive, runtime);
break;
}
case TEvHive::EvResponseHiveNodeStats: {
Expand Down Expand Up @@ -1370,6 +1372,104 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
UNIT_ASSERT(sharedDatabaseFoundInResult);
UNIT_ASSERT(rootDatabaseFoundInResult);
}
}

void HiveSyncTest(bool syncPeriod) {
TPortManager tp;
ui16 port = tp.GetPort(2134);
ui16 grpcPort = tp.GetPort(2135);
auto settings = TServerSettings(port)
.SetNodeCount(1)
.SetDynamicNodeCount(1)
.SetUseRealThreads(false)
.SetDomainName("Root");
TServer server(settings);
server.EnableGRpc(grpcPort);
TClient client(settings);
TTestActorRuntime& runtime = *server.GetRuntime();

ui32 dynNodeId = runtime.GetNodeId(1);

auto observerFunc = [&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvHive::EvResponseHiveInfo: {
auto *x = reinterpret_cast<TEvHive::TEvResponseHiveInfo::TPtr*>(&ev);
auto& record = (*x)->Get()->Record;
record.SetStartTimeTimestamp(0);
if (syncPeriod) {
record.SetResponseTimestamp(NHealthCheck::TSelfCheckRequest::HIVE_SYNCHRONIZATION_PERIOD_MS / 2);
} else {
record.SetResponseTimestamp(NHealthCheck::TSelfCheckRequest::HIVE_SYNCHRONIZATION_PERIOD_MS * 2);
}
auto *tablet = record.MutableTablets()->Add();
tablet->SetTabletID(1);
tablet->SetNodeID(dynNodeId);
tablet->SetTabletType(NKikimrTabletBase::TTabletTypes::DataShard);
tablet->SetVolatileState(NKikimrHive::TABLET_VOLATILE_STATE_BOOTING);
tablet->MutableObjectDomain()->SetSchemeShard(SUBDOMAIN_KEY.OwnerId);
tablet->MutableObjectDomain()->SetPathId(SUBDOMAIN_KEY.LocalPathId);
break;
}
case TEvHive::EvResponseHiveNodeStats: {
auto *x = reinterpret_cast<TEvHive::TEvResponseHiveNodeStats::TPtr*>(&ev);
auto &record = (*x)->Get()->Record;
auto *nodeStats = record.MutableNodeStats()->Add();
nodeStats->SetNodeId(dynNodeId);
nodeStats->MutableNodeDomain()->SetSchemeShard(SUBDOMAIN_KEY.OwnerId);
nodeStats->MutableNodeDomain()->SetPathId(SUBDOMAIN_KEY.LocalPathId);
break;
}
case NConsole::TEvConsole::EvGetTenantStatusResponse: {
auto *x = reinterpret_cast<NConsole::TEvConsole::TEvGetTenantStatusResponse::TPtr*>(&ev);
ChangeGetTenantStatusResponse(x, "/Root/database");
break;
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
TSchemeCacheNavigate::TEntry& entry((*x)->Get()->Request->ResultSet.front());
entry.Status = TSchemeCacheNavigate::EStatus::Ok;
entry.Kind = TSchemeCacheNavigate::EKind::KindExtSubdomain;
entry.Path = {"Root", "database"};
entry.DomainInfo = MakeIntrusive<TDomainInfo>(SUBDOMAIN_KEY, SUBDOMAIN_KEY);

break;
}
}

return TTestActorRuntime::EEventAction::PROCESS;
};
runtime.SetObserverFunc(observerFunc);

TActorId sender = runtime.AllocateEdgeActor();
TAutoPtr<IEventHandle> handle;

auto *request = new NHealthCheck::TEvSelfCheckRequest;
request->Request.set_return_verbose_status(true);
request->Database = "/Root/database";
runtime.Send(new IEventHandle(NHealthCheck::MakeHealthCheckID(), sender, request, 0));
const auto result = runtime.GrabEdgeEvent<NHealthCheck::TEvSelfCheckResult>(handle)->Result;

Cerr << result.ShortDebugString() << Endl;

UNIT_ASSERT_VALUES_EQUAL(result.database_status_size(), 1);

bool deadTabletIssueFoundInResult = false;
for (const auto &issue_log : result.issue_log()) {
if (issue_log.level() == 4 && issue_log.type() == "TABLET") {
UNIT_ASSERT_VALUES_EQUAL(issue_log.location().compute().tablet().id().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(issue_log.location().compute().tablet().type(), "DataShard");
deadTabletIssueFoundInResult = true;
}
}

UNIT_ASSERT_VALUES_EQUAL(syncPeriod, !deadTabletIssueFoundInResult);
}

Y_UNIT_TEST(HiveSyncPeriodIgnoresTabletsState) {
HiveSyncTest(true);
}

Y_UNIT_TEST(AfterHiveSyncPeriodReportsTabletsState) {
HiveSyncTest(false);
}
}
}

0 comments on commit a862c28

Please sign in to comment.