Skip to content

Commit

Permalink
Merge tag '24.3.15' into stream-nb-24-3_plus_24.3.15
Browse files Browse the repository at this point in the history
 Conflicts:
	ydb/core/client/object_storage_listing_ut.cpp
	ydb/core/mind/bscontroller/impl.h
	ydb/core/tablet_flat/flat_database.cpp
	ydb/core/tablet_flat/flat_database.h
	ydb/core/tx/datashard/datashard__read_iterator.cpp
	ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
  • Loading branch information
dcherednik committed Jan 17, 2025
2 parents 85aafe9 + 6b41c90 commit f40fc1e
Show file tree
Hide file tree
Showing 401 changed files with 9,055 additions and 3,549 deletions.
23 changes: 23 additions & 0 deletions ydb/core/base/hive.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace NKikimr {
EvUpdateTabletsObject,
EvUpdateDomain,
EvRequestTabletDistribution,
EvRequestScaleRecommendation,
EvConfigureScaleRecommender,

// replies
EvBootTabletReply = EvBootTablet + 512,
Expand Down Expand Up @@ -84,6 +86,8 @@ namespace NKikimr {
EvUpdateTabletsObjectReply,
EvUpdateDomainReply,
EvResponseTabletDistribution,
EvResponseScaleRecommendation,
EvConfigureScaleRecommenderReply,

EvEnd
};
Expand Down Expand Up @@ -876,6 +880,25 @@ namespace NKikimr {

struct TEvResponseTabletDistribution : TEventPB<TEvResponseTabletDistribution,
NKikimrHive::TEvResponseTabletDistribution, EvResponseTabletDistribution> {};

struct TEvRequestScaleRecommendation : TEventPB<TEvRequestScaleRecommendation,
NKikimrHive::TEvRequestScaleRecommendation, EvRequestScaleRecommendation>
{
TEvRequestScaleRecommendation() = default;

TEvRequestScaleRecommendation(TSubDomainKey domainKey) {
Record.MutableDomainKey()->CopyFrom(domainKey);
}
};

struct TEvResponseScaleRecommendation : TEventPB<TEvResponseScaleRecommendation,
NKikimrHive::TEvResponseScaleRecommendation, EvResponseScaleRecommendation> {};

struct TEvConfigureScaleRecommender : TEventPB<TEvConfigureScaleRecommender,
NKikimrHive::TEvConfigureScaleRecommender, EvConfigureScaleRecommender> {};

struct TEvConfigureScaleRecommenderReply : TEventPB<TEvConfigureScaleRecommenderReply,
NKikimrHive::TEvConfigureScaleRecommenderReply, EvConfigureScaleRecommenderReply> {};
};

IActor* CreateDefaultHive(const TActorId &tablet, TTabletStorageInfo *info);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ class TBlobStorageGroupRequestActor : public TActor<TDerived> {

// ensure that we are dying for the first time
Y_ABORT_UNLESS(!Dead);
if (RequestHandleClass && PoolCounters) {
if (RequestHandleClass && PoolCounters && FirstResponse) {
PoolCounters->GetItem(*RequestHandleClass, RequestBytes).Register(
RequestBytes, GeneratedSubrequests, GeneratedSubrequestBytes, Timer.Passed());
}
Expand Down Expand Up @@ -594,6 +594,7 @@ class TBlobStorageGroupRequestActor : public TActor<TDerived> {

// send the reply to original request sender
Derived().Send(source, ev.release(), 0, cookie);
FirstResponse = false;
};

void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats = nullptr) {
Expand Down Expand Up @@ -658,6 +659,7 @@ class TBlobStorageGroupRequestActor : public TActor<TDerived> {
TActorId ProxyActorId;
std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay;
bool ExecutionRelayUsed = false;
bool FirstResponse = true;
};

void Encrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id,
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ namespace NKikimr::NStorage {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");

auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
nodes->emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>(nodes);
Send(SelfId(), ev.release());

// and subscribe for the node list too
Expand Down
28 changes: 16 additions & 12 deletions ydb/core/client/minikql_compile/yql_expr_minikql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,20 +395,24 @@ class TKikimrCallableTypeAnnotationTransformer : public TSyncTransformerBase {
YQL_ENSURE(column);
typeConstraint = column->TypeConstraint;

// TODO: support pg types
auto columnTypeId = column->Type.GetTypeId();
YQL_ENSURE(columnTypeId != NScheme::NTypeIds::Pg, "pg types are not supported");

// Decimal type is transformed into parametrized Decimal(22, 9).
if (columnTypeId == NYql::NProto::TypeIds::Decimal) {
switch (column->Type.GetTypeId()) {
case NScheme::NTypeIds::Pg: {
// TODO: support pg types
YQL_ENSURE(false, "pg types are not supported");
break;
}
case NScheme::NTypeIds::Decimal: {
columnDataType = ctx.MakeType<TDataExprParamsType>(
NUdf::GetDataSlot(columnTypeId),
ToString(NScheme::DECIMAL_PRECISION),
ToString(NScheme::DECIMAL_SCALE));
} else {
EDataSlot::Decimal,
ToString(column->Type.GetDecimalType().GetPrecision()),
ToString(column->Type.GetDecimalType().GetScale()));
break;
}
default:{
columnDataType = GetMkqlDataTypeAnnotation(
TDataType::Create(columnTypeId, *MkqlCtx->TypeEnv),
ctx);
TDataType::Create(column->Type.GetTypeId(), *MkqlCtx->TypeEnv), ctx);
break;
}
}
}

Expand Down
77 changes: 73 additions & 4 deletions ydb/core/client/object_storage_listing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ Y_UNIT_TEST_SUITE(TObjectStorageListingTest) {
}
}

void TestS3GenericListingRequest(const TVector<TString>& prefixColumns, const TString& pathPrefix, const TString& pathDelimiter,
Ydb::ObjectStorage::ListingResponse TestS3GenericListingRequest(const TVector<TString>& prefixColumns, const TString& pathPrefix, const TString& pathDelimiter,
const TVector<TString>& startAfterSuffixColumns,
const TVector<TString>& columnsToReturn, ui32 maxKeys,
Ydb::StatusIds_StatusCode expectedStatus = Ydb::StatusIds::SUCCESS,
Expand All @@ -597,9 +597,10 @@ Y_UNIT_TEST_SUITE(TObjectStorageListingTest) {
} else {
UNIT_ASSERT_VALUES_EQUAL(response.issues().size(), 0);
}
return response;
}

void TestS3ListingRequest(const TVector<TString>& prefixColumns,
Ydb::ObjectStorage::ListingResponse TestS3ListingRequest(const TVector<TString>& prefixColumns,
const TString& pathPrefix, const TString& pathDelimiter,
const TString& startAfter, const TVector<TString>& columnsToReturn, ui32 maxKeys,
Ydb::StatusIds_StatusCode expectedStatus = Ydb::StatusIds::SUCCESS,
Expand All @@ -609,7 +610,7 @@ Y_UNIT_TEST_SUITE(TObjectStorageListingTest) {
if (!startAfter.empty()) {
startAfterSuffix.push_back(startAfter);
}
TestS3GenericListingRequest(prefixColumns, pathPrefix, pathDelimiter,
return TestS3GenericListingRequest(prefixColumns, pathPrefix, pathDelimiter,
startAfterSuffix,
columnsToReturn, maxKeys,
expectedStatus, expectedErrMessage);
Expand Down Expand Up @@ -1055,7 +1056,7 @@ Y_UNIT_TEST_SUITE(TObjectStorageListingTest) {

TVector<TString> folders;
TVector<TString> files;
DoS3Listing(GRPC_PORT, 100, "/", "/", nullptr, nullptr, {}, 1000, folders, files, Ydb::ObjectStorage::ListingRequest_EMatchType_EQUAL);
DoS3Listing(GRPC_PORT, 100, "/", "/", "", "", {}, 1000, folders, files, Ydb::ObjectStorage::ListingRequest_EMatchType_EQUAL);

TVector<TString> expectedFolders = {"/Photos/"};
TVector<TString> expectedFiles = {};
Expand All @@ -1077,6 +1078,74 @@ Y_UNIT_TEST_SUITE(TObjectStorageListingTest) {
// the range of second partition. Third (last) partition will always be checked.
UNIT_ASSERT_EQUAL(2, count);
}

Y_UNIT_TEST(Decimal) {
TPortManager pm;
ui16 port = pm.GetPort(2134);
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableParameterizedDecimal(true);
TServerSettings serverSettings(port);
serverSettings.SetFeatureFlags(featureFlags);
TServer cleverServer = TServer(serverSettings);
GRPC_PORT = pm.GetPort(2135);
cleverServer.EnableGRpc(GRPC_PORT);

TFlatMsgBusClient annoyingClient(port);
annoyingClient.InitRoot();
annoyingClient.MkDir("/dc-1", "Dir");
annoyingClient.CreateTable("/dc-1/Dir",
R"_(Name: "Table"
Columns { Name: "Hash" Type: "Uint64"}
Columns { Name: "Name" Type: "Utf8"}
Columns { Name: "Path" Type: "Utf8"}
Columns { Name: "Version" Type: "Uint64"}
Columns { Name: "DecimalData" Type: "Decimal"}
Columns { Name: "Decimal35Data" Type: "Decimal(35,10)"}
KeyColumnNames: [
"Hash",
"Name",
"Path",
"Version"
]
)_");

TString insertRowQuery = R"(
(
(let key '(
'('Hash (Uint64 '%llu))
'('Name (Utf8 '"%s"))
'('Path (Utf8 '"%s"))
'('Version (Uint64 '%llu))
))
(let value '(
'('DecimalData (Decimal '"%s" '22 '9))
'('Decimal35Data (Decimal '"%s" '35 '10))
))
(let ret_ (AsList
(UpdateRow '/dc-1/Dir/%s key value)
))
(return ret_)
)
)";

annoyingClient.FlatQuery(Sprintf(insertRowQuery.data(), 100, "Bucket100", "/Path1", 1, "123.321", "355555555555555.123456789", "Table" ));

Ydb::ObjectStorage::ListingResponse response = TestS3ListingRequest({"100", "Bucket100"}, "/", "/", "", {"DecimalData", "Decimal35Data"}, 10,
Ydb::StatusIds::SUCCESS,
"");

Ydb::ResultSet resultSet = response.contents();
UNIT_ASSERT_VALUES_EQUAL(resultSet.columns_size(), 4);
UNIT_ASSERT_STRINGS_EQUAL(resultSet.columns(2).name(), "Decimal35Data");
UNIT_ASSERT_VALUES_EQUAL(resultSet.columns(2).type().optional_type().item().decimal_type().precision(), 35);
UNIT_ASSERT_VALUES_EQUAL(resultSet.columns(2).type().optional_type().item().decimal_type().scale(), 10);
UNIT_ASSERT_STRINGS_EQUAL(resultSet.columns(3).name(), "DecimalData");
UNIT_ASSERT_VALUES_EQUAL(resultSet.columns(3).type().optional_type().item().decimal_type().precision(), 22);
UNIT_ASSERT_VALUES_EQUAL(resultSet.columns(3).type().optional_type().item().decimal_type().scale(), 9);
UNIT_ASSERT_VALUES_EQUAL(resultSet.rows(0).items(2).low_128(), 975580256289238738);
UNIT_ASSERT_VALUES_EQUAL(resultSet.rows(0).items(2).high_128(), 192747);
UNIT_ASSERT_VALUES_EQUAL(resultSet.rows(0).items(3).low_128(), 123321000000);
}
}

}}
7 changes: 4 additions & 3 deletions ydb/core/cms/cms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ bool TCms::TryToLockVDisk(const TActionOptions& opts,
return false;
}

auto counters = CreateErasureCounter(ClusterInfo->BSGroup(groupId).Erasure.GetErasure(), vdisk, groupId);
auto counters = CreateErasureCounter(ClusterInfo->BSGroup(groupId).Erasure.GetErasure(), vdisk, groupId, TabletCounters);
counters->CountGroupState(ClusterInfo, State->Config.DefaultRetryTime, duration, error);

switch (opts.AvailabilityMode) {
Expand All @@ -942,10 +942,11 @@ bool TCms::TryToLockVDisk(const TActionOptions& opts,
}
break;
case MODE_FORCE_RESTART:
if ( counters->GroupAlreadyHasLockedDisks() && opts.PartialPermissionAllowed) {
if (counters->GroupAlreadyHasLockedDisks() && !counters->GroupHasMoreThanOneDiskPerNode() && opts.PartialPermissionAllowed) {
TabletCounters->Cumulative()[COUNTER_PARTIAL_PERMISSIONS_OPTIMIZED].Increment(1);
error.Code = TStatus::DISALLOW_TEMP;
error.Reason = "You cannot get two or more disks from the same group at the same time"
" without specifying the PartialPermissionAllowed parameter";
" in partial permissions allowed mode";
error.Deadline = defaultDeadline;
return false;
}
Expand Down
21 changes: 21 additions & 0 deletions ydb/core/cms/cms_maintenance_api_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@ Y_UNIT_TEST_SUITE(TMaintenanceApiTest) {
UNIT_ASSERT_VALUES_EQUAL(a2.reason(), ActionState::ACTION_REASON_TOO_MANY_UNAVAILABLE_VDISKS);
UNIT_ASSERT(a2.reason_details().Contains("too many unavailable vdisks"));
}

Y_UNIT_TEST(SimplifiedMirror3DC) {
TTestEnvOpts options(3);
options.UseMirror3dcErasure = true;
options.DataCenterCount = 3;
TCmsTestEnv env(options);

auto response = env.CheckMaintenanceTaskCreate(
"task-1",
Ydb::StatusIds::SUCCESS,
Ydb::Maintenance::AVAILABILITY_MODE_WEAK,
MakeActionGroup(
MakeLockAction(env.GetNodeId(0), TDuration::Minutes(10))
)
);

UNIT_ASSERT_VALUES_EQUAL(response.action_group_states().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(response.action_group_states(0).action_states().size(), 1);
const auto &a = response.action_group_states(0).action_states(0);
UNIT_ASSERT_VALUES_EQUAL(a.status(), ActionState::ACTION_STATUS_PERFORMED);
}
}

} // namespace NKikimr::NCmsTest
68 changes: 46 additions & 22 deletions ydb/core/cms/cms_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,20 +239,21 @@ class TFakeTenantPool : public TActorBootstrapped<TFakeTenantPool> {

void GenerateExtendedInfo(TTestActorRuntime &runtime, NKikimrBlobStorage::TBaseConfig *config,
ui32 pdisks, ui32 vdiskPerPdisk = 4, const TNodeTenantsMap &tenants = {}, bool useMirror3dcErasure = false)
{
{
constexpr ui32 MIRROR_3DC_VDISKS_COUNT = 9;
constexpr ui32 BLOCK_4_2_VDISKS_COUNT = 8;

ui32 numNodes = runtime.GetNodeCount();
ui32 numNodeGroups = pdisks * vdiskPerPdisk;
ui32 vdisksPerNode = pdisks * vdiskPerPdisk;
ui32 numGroups;

if (numNodes < 9)
useMirror3dcErasure = false;

if (useMirror3dcErasure)
numGroups = numNodes * numNodeGroups / 9;
else if (numNodes >= 8)
numGroups = numNodes * numNodeGroups / 8;
numGroups = numNodes * vdisksPerNode / MIRROR_3DC_VDISKS_COUNT;
else if (numNodes >= BLOCK_4_2_VDISKS_COUNT)
numGroups = numNodes * vdisksPerNode / BLOCK_4_2_VDISKS_COUNT;
else
numGroups = numNodes * numNodeGroups;
numGroups = numNodes * vdisksPerNode;

ui32 maxOneGroupVdisksPerNode = useMirror3dcErasure && numNodes < MIRROR_3DC_VDISKS_COUNT ? 3 : 1;

auto now = runtime.GetTimeProvider()->Now();
for (ui32 groupId = 0; groupId < numGroups; ++groupId) {
Expand All @@ -261,7 +262,7 @@ void GenerateExtendedInfo(TTestActorRuntime &runtime, NKikimrBlobStorage::TBaseC
group.SetGroupGeneration(1);
if (useMirror3dcErasure)
group.SetErasureSpecies("mirror-3-dc");
else if (numNodes >= 8)
else if (numNodes >= BLOCK_4_2_VDISKS_COUNT)
group.SetErasureSpecies("block-4-2");
else
group.SetErasureSpecies("none");
Expand All @@ -284,12 +285,18 @@ void GenerateExtendedInfo(TTestActorRuntime &runtime, NKikimrBlobStorage::TBaseC
} else {
node.SystemStateInfo.AddRoles("Storage");
}

ui32 groupShift = (nodeIndex / 8) * pdisks * vdiskPerPdisk;
if (numNodes < 8)
groupShift = nodeIndex * numNodeGroups;
if (useMirror3dcErasure)
groupShift = (nodeIndex / 9) * pdisks * vdiskPerPdisk;

ui32 groupsPerNode = vdisksPerNode / maxOneGroupVdisksPerNode;
ui32 groupShift;
if (useMirror3dcErasure) {
ui32 groupNodesSize = MIRROR_3DC_VDISKS_COUNT / maxOneGroupVdisksPerNode;
groupShift = (nodeIndex / groupNodesSize) * groupsPerNode;
} else if (numNodes >= BLOCK_4_2_VDISKS_COUNT) {
ui32 groupNodesSize = BLOCK_4_2_VDISKS_COUNT / maxOneGroupVdisksPerNode;
groupShift = (nodeIndex / groupNodesSize) * groupsPerNode;
} else {
groupShift = nodeIndex * groupsPerNode;
}

for (ui32 pdiskIndex = 0; pdiskIndex < pdisks; ++pdiskIndex) {
auto pdiskId = nodeId * pdisks + pdiskIndex;
Expand All @@ -316,12 +323,28 @@ void GenerateExtendedInfo(TTestActorRuntime &runtime, NKikimrBlobStorage::TBaseC

for (ui8 vdiskIndex = 0; vdiskIndex < vdiskPerPdisk; ++vdiskIndex) {
ui32 vdiskId = pdiskIndex * vdiskPerPdisk + vdiskIndex;
ui32 groupId = groupShift + vdiskId;
ui32 groupId = groupShift + vdiskId / maxOneGroupVdisksPerNode;

if (groupId >= config->GroupSize()) {
break;
}

ui32 failRealm = 0;
if (useMirror3dcErasure)
failRealm = (nodeIndex % 9) / 3;
if (useMirror3dcErasure) {
if (numNodes >= MIRROR_3DC_VDISKS_COUNT) {
failRealm = (nodeIndex % MIRROR_3DC_VDISKS_COUNT) / 3;
} else {
failRealm = nodeIndex % 3;
}
}

TVDiskID id = {(ui8)groupId, 1, (ui8)failRealm, (ui8)(nodeIndex % 8), (ui8)0};
TVDiskID id = {
(ui8)groupId,
1,
(ui8)failRealm,
(ui8)(nodeIndex % BLOCK_4_2_VDISKS_COUNT),
(ui8)(vdiskId % maxOneGroupVdisksPerNode)
};

auto &vdisk = node.VDiskStateInfo[id];
VDiskIDFromVDiskID(id, vdisk.MutableVDiskId());
Expand All @@ -339,7 +362,8 @@ void GenerateExtendedInfo(TTestActorRuntime &runtime, NKikimrBlobStorage::TBaseC
vdiskConfig.SetGroupId(groupId);
vdiskConfig.SetGroupGeneration(1);
vdiskConfig.SetFailRealmIdx(failRealm);
vdiskConfig.SetFailDomainIdx(nodeIndex % 8);
vdiskConfig.SetFailDomainIdx(nodeIndex % BLOCK_4_2_VDISKS_COUNT);
vdiskConfig.SetVDiskIdx(vdiskId % maxOneGroupVdisksPerNode);

config->MutableGroup(groupId)->AddVSlotId()
->CopyFrom(vdiskConfig.GetVSlotId());
Expand Down
Loading

0 comments on commit f40fc1e

Please sign in to comment.