diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 8b18726c9499..9c2edf68ce4b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,25 +1 @@ -/*.md @ydb-platform/docs - -/ydb/core/fq/ @ydb-platform/fq -/ydb/core/public_http/ @ydb-platform/fq - -/ydb/docs/ @ydb-platform/docs - -/ydb/library/yql/ @ydb-platform/yql -/ydb/library/yql/dq @ydb-platform/yql @ydb-platform/qp -/ydb/library/yql/providers/generic @ydb-platform/fq -/ydb/library/yql/providers/pq @ydb-platform/fq -/ydb/library/yql/providers/s3 @ydb-platform/fq -/ydb/library/yql/providers/solomon @ydb-platform/fq - -/ydb/library/yql/yt @Krock21 @Krisha11 @zlobober @gritukan - -/ydb/services/fq/ @ydb-platform/fq - -/ydb/core/kafka_proxy @ydb-platform/Topics -/ydb/core/persqueue @ydb-platform/Topics -/ydb/services/datastreams @ydb-platform/Topics -/ydb/services/deprecated/persqueue_v0 @ydb-platform/Topics -/ydb/services/persqueue_v1 @ydb-platform/Topics - -/ydb/core/config/ut @ydb-platform/core +* @ydb-platform/ReleaseApprovers diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 63a8df0cbe9f..98ba4fbd71d9 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -8,12 +8,10 @@ ydb/core/external_sources * ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient ydb/core/keyvalue/ut_trace TKeyValueTracingTest.* ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata -ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization ydb/core/kqp/ut/olap KqpOlapBlobsSharing.* -ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL -ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL +ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC ydb/core/kqp/ut/pg KqpPg.CreateIndex ydb/core/kqp/ut/query KqpLimits.QueryReplySize ydb/core/kqp/ut/query KqpQuery.QueryTimeout @@ -98,6 +96,7 @@ ydb/tests/fq/yds * ydb/tests/fq/control_plane_storage * ydb/tests/functional/audit * ydb/tests/functional/blobstorage test_replication.py.TestReplicationAfterNodesRestart.test_replication* +ydb/tests/functional/clickbench test.py.test_plans[column] ydb/tests/functional/kqp/kqp_indexes ConsistentIndexRead.InteractiveTx ydb/tests/functional/kqp/kqp_query_session KqpQuerySession.NoLocalAttach ydb/tests/functional/restarts test_restarts.py.* diff --git a/library/cpp/lwtrace/mon/mon_lwtrace.cpp b/library/cpp/lwtrace/mon/mon_lwtrace.cpp index 09d56560c4bf..a10ae4a775f8 100644 --- a/library/cpp/lwtrace/mon/mon_lwtrace.cpp +++ b/library/cpp/lwtrace/mon/mon_lwtrace.cpp @@ -301,7 +301,7 @@ struct TLogQuery { } } catch (...) { ythrow yexception() - << CurrentExceptionMessage() + << EncodeHtmlPcdata(CurrentExceptionMessage()) << " while parsing track log query: " << Text; } @@ -1853,7 +1853,7 @@ class TTracesHtmlPrinter { try { Os << src->GetStartTime().ToStringUpToSeconds(); } catch (...) { - Os << "error: " << CurrentExceptionMessage(); + Os << "error: " << EncodeHtmlPcdata(CurrentExceptionMessage()); } Os << "" << "
"
- << CurrentExceptionMessage()
+ << EncodeHtmlPcdata(CurrentExceptionMessage())
<< Endl;
}
}
diff --git a/ydb/apps/version/version_definition.cpp b/ydb/apps/version/version_definition.cpp
index 4cfb93b74f79..8d7273d188fc 100644
--- a/ydb/apps/version/version_definition.cpp
+++ b/ydb/apps/version/version_definition.cpp
@@ -2,10 +2,21 @@
NKikimrConfig::TCurrentCompatibilityInfo NKikimr::TCompatibilityInfo::MakeCurrent() {
using TCurrentConstructor = NKikimr::TCompatibilityInfo::TProtoConstructor::TCurrentCompatibilityInfo;
- // using TVersionConstructor = NKikimr::TCompatibilityInfo::TProtoConstructor::TVersion;
- // using TCompatibilityRuleConstructor = NKikimr::TCompatibilityInfo::TProtoConstructor::TCompatibilityRule;
+ using TVersionConstructor = NKikimr::TCompatibilityInfo::TProtoConstructor::TVersion;
+ using TCompatibilityRuleConstructor = NKikimr::TCompatibilityInfo::TProtoConstructor::TCompatibilityRule;
return TCurrentConstructor{
.Application = "ydb",
+ .Version = TVersionConstructor{
+ .Year = 24,
+ .Major = 3,
+ },
+ .CanConnectTo = {
+ TCompatibilityRuleConstructor{
+ .Application = "nbs",
+ .LowerLimit = TVersionConstructor{ .Year = 23, .Major = 3 },
+ .UpperLimit = TVersionConstructor{ .Year = 24, .Major = 3 },
+ }
+ }
}.ToPB();
-}
+}
\ No newline at end of file
diff --git a/ydb/apps/ydbd/ya.make b/ydb/apps/ydbd/ya.make
index d399248c2264..12d685e849f1 100644
--- a/ydb/apps/ydbd/ya.make
+++ b/ydb/apps/ydbd/ya.make
@@ -1,6 +1,8 @@
PROGRAM(ydbd)
-NO_EXPORT_DYNAMIC_SYMBOLS()
+IF (NOT SANITIZER_TYPE) # for some reasons some tests with asan are failed, see comment in CPPCOM-32
+ NO_EXPORT_DYNAMIC_SYMBOLS()
+ENDIF()
IF (OS_LINUX)
ALLOCATOR(TCMALLOC_256K)
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index a73efb723307..14eab5de98e9 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -12,175 +12,178 @@ namespace NKikimr {
struct TKikimrEvents : TEvents {
enum EEventSpaceKikimr {
/* WARNING:
- Please mind that you should never change the order
- for the following keywords, you should consider
- issues about "rolling update".
+ Please mind that you should never change values,
+ you should consider issues about "rolling update".
*/
- ES_KIKIMR_ES_BEGIN = ES_USERSPACE, //4096
- ES_STATESTORAGE, //4097
- ES_DEPRECATED_4098, //4098
- ES_BLOBSTORAGE, //4099
- ES_HIVE, //4100
- ES_TABLETBASE, //4101
- ES_TABLET, //4102
- ES_TABLETRESOLVER,
- ES_LOCAL,
- ES_DEPRECATED_4105,
- ES_TX_PROXY, // generic proxy commands 4106
- ES_TX_COORDINATOR,
- ES_TX_MEDIATOR,
- ES_TX_PROCESSING, // 4109
- ES_DEPRECATED_4110,
- ES_DEPRECATED_4111,
- ES_DEPRECATED_4112,
- ES_TX_DATASHARD,
- ES_DEPRECATED_4114,
- ES_TX_USERPROXY, // user proxy interface
- ES_SCHEME_CACHE,
- ES_TX_PROXY_REQ,
- ES_TABLET_PIPE,
- ES_DEPRECATED_4118,
- ES_TABLET_COUNTERS_AGGREGATOR,
- ES_DEPRECATED_4121,
- ES_PROXY_BUS, //4122
- ES_BOOTSTRAPPER,
- ES_TX_MEDIATORTIMECAST,
- ES_DEPRECATED_4125,
- ES_DEPRECATED_4126,
- ES_DEPRECATED_4127,
- ES_DEPRECATED_4128,
- ES_DEPRECATED_4129,
- ES_DEPRECATED_4130,
- ES_DEPRECATED_4131,
- ES_KEYVALUE, //4132
- ES_MSGBUS_TRACER,
- ES_RTMR_TABLET,
- ES_FLAT_EXECUTOR,
- ES_NODE_WHITEBOARD,
- ES_FLAT_TX_SCHEMESHARD, // 4137
- ES_PQ,
- ES_YQL_KIKIMR_PROXY,
- ES_PQ_META_CACHE,
- ES_DEPRECATED_4141,
- ES_PQ_L2_CACHE, //4142
- ES_TOKEN_BUILDER,
- ES_TICKET_PARSER,
- ES_KQP = NYql::NDq::TDqEvents::ES_DQ_COMPUTE_KQP_COMPATIBLE, // 4145
- ES_BLACKBOX_VALIDATOR,
- ES_SELF_PING,
- ES_PIPECACHE,
- ES_PQ_PROXY,
- ES_CMS,
- ES_NODE_BROKER,
- ES_TX_ALLOCATOR, //4152
+ ES_KIKIMR_ES_BEGIN = ES_USERSPACE, // 4096
+ ES_STATESTORAGE = 4097,
+ ES_DEPRECATED_4098 = 4098,
+ ES_BLOBSTORAGE = 4099,
+ ES_HIVE = 4100,
+ ES_TABLETBASE = 4101,
+ ES_TABLET = 4102,
+ ES_TABLETRESOLVER = 4103,
+ ES_LOCAL = 4104,
+ ES_DEPRECATED_4105 = 4105,
+ ES_TX_PROXY = 4106,
+ ES_TX_COORDINATOR = 4107,
+ ES_TX_MEDIATOR = 4108,
+ ES_TX_PROCESSING = 4109,
+ ES_DEPRECATED_4110 = 4110,
+ ES_DEPRECATED_4111 = 4111,
+ ES_DEPRECATED_4112 = 4112,
+ ES_TX_DATASHARD = 4113,
+ ES_DEPRECATED_4114 = 4114,
+ ES_TX_USERPROXY = 4115,
+ ES_SCHEME_CACHE = 4116,
+ ES_TX_PROXY_REQ = 4117,
+ ES_TABLET_PIPE = 4118,
+ ES_DEPRECATED_4118 = 4119,
+ ES_TABLET_COUNTERS_AGGREGATOR = 4120,
+ ES_DEPRECATED_4121 = 4121,
+ ES_PROXY_BUS = 4122,
+ ES_BOOTSTRAPPER = 4123,
+ ES_TX_MEDIATORTIMECAST = 4124,
+ ES_DEPRECATED_4125 = 4125,
+ ES_DEPRECATED_4126 = 4126,
+ ES_DEPRECATED_4127 = 4127,
+ ES_DEPRECATED_4128 = 4128,
+ ES_DEPRECATED_4129 = 4129,
+ ES_DEPRECATED_4130 = 4130,
+ ES_DEPRECATED_4131 = 4131,
+ ES_KEYVALUE = 4132,
+ ES_MSGBUS_TRACER = 4133,
+ ES_RTMR_TABLET = 4134,
+ ES_FLAT_EXECUTOR = 4135,
+ ES_NODE_WHITEBOARD = 4136,
+ ES_FLAT_TX_SCHEMESHARD = 4137,
+ ES_PQ = 4138,
+ ES_YQL_KIKIMR_PROXY = 4139,
+ ES_PQ_META_CACHE = 4140,
+ ES_DEPRECATED_4141 = 4141,
+ ES_PQ_L2_CACHE = 4142,
+ ES_TOKEN_BUILDER = 4143,
+ ES_TICKET_PARSER = 4144,
+ ES_KQP = 4145,
+ ES_BLACKBOX_VALIDATOR = 4146,
+ ES_SELF_PING = 4147,
+ ES_PIPECACHE = 4148,
+ ES_PQ_PROXY = 4149,
+ ES_CMS = 4150,
+ ES_NODE_BROKER = 4151,
+ ES_TX_ALLOCATOR = 4152,
// reserve event space for each RTMR process
- ES_RTMR_STORAGE,
- ES_RTMR_PROXY,
- ES_RTMR_PUSHER,
- ES_RTMR_HOST,
- ES_RESOURCE_BROKER,
- ES_VIEWER,
- ES_SUB_DOMAIN,
- ES_GRPC_PROXY_STATUS, //OLD
- ES_SQS,
- ES_BLOCKSTORE, //4162
- ES_RTMR_ICBUS,
- ES_TENANT_POOL,
- ES_USER_REGISTRY,
- ES_TVM_SETTINGS_UPDATER,
- ES_PQ_CLUSTERS_UPDATER,
- ES_TENANT_SLOT_BROKER,
- ES_GRPC_CALLS,
- ES_CONSOLE,
- ES_KESUS_PROXY,
- ES_KESUS,
- ES_CONFIGS_DISPATCHER,
- ES_IAM_SERVICE,
- ES_FOLDER_SERVICE,
- ES_GRPC_MON,
- ES_QUOTA, // must be in sync with ydb/core/quoter/public/quoter.h
- ES_COORDINATED_QUOTA,
- ES_ACCESS_SERVICE,
- ES_USER_ACCOUNT_SERVICE,
- ES_PQ_PROXY_NEW,
- ES_GRPC_STREAMING,
- ES_SCHEME_BOARD,
- ES_FLAT_TX_SCHEMESHARD_PROTECTED,
- ES_GRPC_REQUEST_PROXY,
- ES_EXPORT_SERVICE,
- ES_TX_ALLOCATOR_CLIENT,
- ES_PQ_CLUSTER_TRACKER,
- ES_NET_CLASSIFIER,
- ES_SYSTEM_VIEW,
- ES_TENANT_NODE_ENUMERATOR,
- ES_SERVICE_ACCOUNT_SERVICE,
- ES_INDEX_BUILD,
- ES_BLOCKSTORE_PRIVATE,
- ES_YT_WRAPPER,
- ES_S3_WRAPPER,
- ES_FILESTORE,
- ES_FILESTORE_PRIVATE,
- ES_YDB_METERING,
- ES_IMPORT_SERVICE, // 4200
- ES_TX_OLAPSHARD,
- ES_TX_COLUMNSHARD,
- ES_CROSSREF,
- ES_SCHEME_BOARD_MON,
- ES_YQL_ANALYTICS_PROXY = NFq::TEventIds::ES_YQL_ANALYTICS_PROXY,
- ES_BLOB_CACHE,
- ES_LONG_TX_SERVICE,
- ES_TEST_SHARD,
- ES_DATASTREAMS_PROXY,
- ES_IAM_TOKEN_SERVICE,
- ES_HEALTH_CHECK,
- ES_DQ = NYql::NDq::TDqEvents::ES_DQ_COMPUTE, // 4212
- ES_YQ, // 4213
- ES_CHANGE_EXCHANGE_DATASHARD,
- ES_DATABASE_SERVICE, //4215
- ES_SEQUENCESHARD, // 4216
- ES_SEQUENCEPROXY, // 4217
- ES_CLOUD_STORAGE,
- ES_CLOUD_STORAGE_PRIVATE,
- ES_FOLDER_SERVICE_ADAPTER,
- ES_PQ_PARTITION_WRITER,
- ES_YDB_PROXY,
- ES_REPLICATION_CONTROLLER,
- ES_HTTP_PROXY,
- ES_BLOB_DEPOT,
- ES_DATASHARD_LOAD,
- ES_METADATA_PROVIDER,
- ES_INTERNAL_REQUEST,
- ES_BACKGROUND_TASKS,
- ES_TIERING,
- ES_METADATA_INITIALIZER,
- ES_YDB_AUDIT_LOG,
- ES_METADATA_MANAGER,
- ES_METADATA_SECRET,
- ES_TEST_LOAD,
- ES_GRPC_CANCELATION,
- ES_DISCOVERY,
- ES_EXT_INDEX,
- ES_CONVEYOR,
- ES_KQP_SCAN_EXCHANGE,
- ES_IC_NODE_CACHE,
- ES_DATA_OPERATIONS,
- ES_KAFKA,
- ES_STATISTICS,
- ES_LDAP_AUTH_PROVIDER,
- ES_DB_METADATA_CACHE,
- ES_TABLE_CREATOR,
- ES_PQ_PARTITION_CHOOSER,
- ES_GRAPH,
- ES_REPLICATION_WORKER,
- ES_CHANGE_EXCHANGE,
- ES_S3_PROVIDER,
- ES_NEBIUS_ACCESS_SERVICE,
- ES_REPLICATION_SERVICE,
- ES_BACKUP_SERVICE,
- ES_TX_BACKGROUND,
- ES_SS_BG_TASKS,
- ES_LIMITER
+ ES_RTMR_STORAGE = 4153,
+ ES_RTMR_PROXY = 4154,
+ ES_RTMR_PUSHER = 4155,
+ ES_RTMR_HOST = 4156,
+ ES_RESOURCE_BROKER = 4157,
+ ES_VIEWER = 4158,
+ ES_SUB_DOMAIN = 4159,
+ ES_GRPC_PROXY_STATUS = 4160,
+ ES_SQS = 4161,
+ ES_BLOCKSTORE = 4162,
+ ES_RTMR_ICBUS = 4163,
+ ES_TENANT_POOL = 4164,
+ ES_USER_REGISTRY = 4165,
+ ES_TVM_SETTINGS_UPDATER = 4166,
+ ES_PQ_CLUSTERS_UPDATER = 4167,
+ ES_TENANT_SLOT_BROKER = 4168,
+ ES_GRPC_CALLS = 4169,
+ ES_CONSOLE = 4170,
+ ES_KESUS_PROXY = 4171,
+ ES_KESUS = 4172,
+ ES_CONFIGS_DISPATCHER = 4173,
+ ES_IAM_SERVICE = 4174,
+ ES_FOLDER_SERVICE = 4175,
+ ES_GRPC_MON = 4176,
+ ES_QUOTA = 4177, // must be in sync with ydb/core/quoter/public/quoter.h
+ ES_COORDINATED_QUOTA = 4178,
+ ES_ACCESS_SERVICE = 4179,
+ ES_USER_ACCOUNT_SERVICE = 4180,
+ ES_PQ_PROXY_NEW = 4181,
+ ES_GRPC_STREAMING = 4182,
+ ES_SCHEME_BOARD = 4183,
+ ES_FLAT_TX_SCHEMESHARD_PROTECTED = 4184,
+ ES_GRPC_REQUEST_PROXY = 4185,
+ ES_EXPORT_SERVICE = 4186,
+ ES_TX_ALLOCATOR_CLIENT = 4187,
+ ES_PQ_CLUSTER_TRACKER = 4188,
+ ES_NET_CLASSIFIER = 4189,
+ ES_SYSTEM_VIEW = 4190,
+ ES_TENANT_NODE_ENUMERATOR = 4191,
+ ES_SERVICE_ACCOUNT_SERVICE = 4192,
+ ES_INDEX_BUILD = 4193,
+ ES_BLOCKSTORE_PRIVATE = 4194,
+ ES_YT_WRAPPER = 4195,
+ ES_S3_WRAPPER = 4196,
+ ES_FILESTORE = 4197,
+ ES_FILESTORE_PRIVATE = 4198,
+ ES_YDB_METERING = 4199,
+ ES_IMPORT_SERVICE = 4200,
+ ES_TX_OLAPSHARD = 4201,
+ ES_TX_COLUMNSHARD = 4202,
+ ES_CROSSREF = 4203,
+ ES_SCHEME_BOARD_MON = 4204,
+ ES_YQL_ANALYTICS_PROXY = 4205,
+ ES_BLOB_CACHE = 4206,
+ ES_LONG_TX_SERVICE = 4207,
+ ES_TEST_SHARD = 4208,
+ ES_DATASTREAMS_PROXY = 4209,
+ ES_IAM_TOKEN_SERVICE = 4210,
+ ES_HEALTH_CHECK = 4211,
+ ES_DQ = 4212,
+ ES_YQ = 4213,
+ ES_CHANGE_EXCHANGE_DATASHARD = 4214,
+ ES_DATABASE_SERVICE = 4215,
+ ES_SEQUENCESHARD = 4216,
+ ES_SEQUENCEPROXY = 4217,
+ ES_CLOUD_STORAGE = 4218,
+ ES_CLOUD_STORAGE_PRIVATE = 4219,
+ ES_FOLDER_SERVICE_ADAPTER = 4220,
+ ES_PQ_PARTITION_WRITER = 4221,
+ ES_YDB_PROXY = 4222,
+ ES_REPLICATION_CONTROLLER = 4223,
+ ES_HTTP_PROXY = 4224,
+ ES_BLOB_DEPOT = 4225,
+ ES_DATASHARD_LOAD = 4226,
+ ES_METADATA_PROVIDER = 4227,
+ ES_INTERNAL_REQUEST = 4228,
+ ES_BACKGROUND_TASKS = 4229,
+ ES_TIERING = 4230,
+ ES_METADATA_INITIALIZER = 4231,
+ ES_YDB_AUDIT_LOG = 4232,
+ ES_METADATA_MANAGER = 4233,
+ ES_METADATA_SECRET = 4234,
+ ES_TEST_LOAD = 4235,
+ ES_GRPC_CANCELATION = 4236,
+ ES_DISCOVERY = 4237,
+ ES_EXT_INDEX = 4238,
+ ES_CONVEYOR = 4239,
+ ES_KQP_SCAN_EXCHANGE = 4240,
+ ES_IC_NODE_CACHE = 4241,
+ ES_DATA_OPERATIONS = 4242,
+ ES_KAFKA = 4243,
+ ES_STATISTICS = 4244,
+ ES_LDAP_AUTH_PROVIDER = 4245,
+ ES_DB_METADATA_CACHE = 4246,
+ ES_TABLE_CREATOR = 4247,
+ ES_PQ_PARTITION_CHOOSER = 4248,
+ ES_GRAPH = 4249,
+ ES_REPLICATION_WORKER = 4250,
+ ES_CHANGE_EXCHANGE = 4251,
+ ES_S3_PROVIDER = 4252,
+ ES_NEBIUS_ACCESS_SERVICE = 4253,
+ ES_REPLICATION_SERVICE = 4254,
+ ES_BACKUP_SERVICE = 4255,
+ ES_TX_BACKGROUND = 4256,
+ ES_SS_BG_TASKS = 4257,
+ ES_LIMITER = 4258,
};
};
+static_assert((int)TKikimrEvents::EEventSpaceKikimr::ES_KQP == (int)NYql::NDq::TDqEvents::ES_DQ_COMPUTE_KQP_COMPATIBLE);
+static_assert((int)TKikimrEvents::EEventSpaceKikimr::ES_DQ == (int)NYql::NDq::TDqEvents::ES_DQ_COMPUTE);
+static_assert((int)TKikimrEvents::EEventSpaceKikimr::ES_YQL_ANALYTICS_PROXY == (int)NFq::TEventIds::ES_YQL_ANALYTICS_PROXY);
+
}
diff --git a/ydb/core/base/path.h b/ydb/core/base/path.h
index ca7c1403f56b..71263b8c6b79 100644
--- a/ydb/core/base/path.h
+++ b/ydb/core/base/path.h
@@ -37,4 +37,12 @@ inline TVector ChildPath(const TVector& parentPath, const TStr
return path;
}
+inline TVector ChildPath(const TVector& parentPath, const TVector& childPath) {
+ auto path = parentPath;
+ for (const auto& childName : childPath) {
+ path.push_back(childName);
+ }
+ return path;
+}
+
}
diff --git a/ydb/core/base/pool_stats_collector.cpp b/ydb/core/base/pool_stats_collector.cpp
index 0ddce4c08d37..7a06d5a3dd9f 100644
--- a/ydb/core/base/pool_stats_collector.cpp
+++ b/ydb/core/base/pool_stats_collector.cpp
@@ -46,9 +46,9 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
void OnWakeup(const TActorContext &ctx) override {
MiniKQLPoolStats.Update();
- TVector> pools;
+ TVector> pools;
for (const auto& pool : PoolCounters) {
- pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
+ pools.emplace_back(pool.Name, pool.Usage, pool.Threads, pool.LimitThreads);
}
ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
index 19659e6b7759..fbb545409a89 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
@@ -29,10 +29,33 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor FoundParts;
@@ -69,12 +94,16 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor EmptyResponseFlags;
TStackVec ErrorResponseFlags;
TStackVec ForceStopFlags;
+ TStackVec SlowFlags;
TBlobStorageGroupInfo::TVDiskIds VDisks;
bool UseVPatch = false;
bool IsGoodPatchedBlobId = false;
bool IsAllowedErasure = false;
bool IsSecured = false;
+ bool HasSlowVDisk = false;
+ bool IsContinuedVPatch = false;
+ bool IsMovedPatch = false;
#define PATCH_LOG(priority, service, marker, msg, ...) \
STLOG(priority, service, marker, msg, \
@@ -97,6 +126,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorActivePatch;
}
+ void ScheduleWakeUp(TInstant startTime, EWakeUpTag tag) {
+ TDuration duration = TActivationContext::Now() - startTime;
+ Schedule(duration, new TEvents::TEvWakeup(tag));
+ }
+
+ void ScheduleWakeUp(EWakeUpTag tag) {
+ ScheduleWakeUp(StageStart, tag);
+ }
+
static constexpr ERequestType RequestType() {
return ERequestType::Patch;
}
@@ -279,6 +317,12 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorType.ErasureFamily() != TErasureType::ErasureMirror) {
+ if (ReceivedFoundParts == SentStarts / 2 + SentStarts % 2) {
+ ScheduleWakeUp(VPatchStartTag);
+ }
+ }
+
NKikimrBlobStorage::TEvVPatchFoundParts &record = ev->Get()->Record;
Y_ABORT_UNLESS(record.HasCookie());
@@ -312,6 +356,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor(TStringBuilder() << ReceivedFoundParts << '/' << SentStarts)),
(ErrorReason, errorReason));
@@ -341,6 +386,13 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorType.ErasureFamily() != TErasureType::ErasureMirror) {
+ if (ReceivedResults == SentVPatchDiff / 2 + SentVPatchDiff % 2) {
+ ScheduleWakeUp(VPatchDiffTag);
+ }
+ }
+
PullOutStatusFlagsAndFressSpace(record);
Y_ABORT_UNLESS(record.HasStatus());
NKikimrProto::EReplyStatus status = record.GetStatus();
@@ -352,6 +404,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor(TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())),
(ErrorReason, errorReason));
@@ -499,6 +552,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor= dataParts, "vdiskIdx# " << vdiskIdx << " partIdx# " << partIdx);
placements.push_back(TPartPlacement{static_cast(vdiskIdx), static_cast(partIdx + 1)});
+ SentVPatchDiff++;
}
SendDiffs(placements);
}
@@ -537,15 +592,38 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor subgroupIdx = 0;
- ui32 subgroupIdx = 0;
if (OkVDisksWithParts) {
ui32 okVDiskIdx = RandomNumber(OkVDisksWithParts.size());
subgroupIdx = OkVDisksWithParts[okVDiskIdx];
} else {
+ ui64 worstNs = 0;
+ ui64 nextToWorstNs = 0;
+ i32 worstSubGroubIdx = -1;
+ GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::PutAsyncBlob, &worstNs, &nextToWorstNs, &worstSubGroubIdx);
+ if (worstNs > nextToWorstNs * 2) {
+ SlowFlags[worstSubGroubIdx] = true;
+ HasSlowVDisk = true;
+ }
+ if (HasSlowVDisk) {
+ TStackVec goodDisks;
+ for (ui32 idx = 0; idx < VDisks.size(); ++idx) {
+ if (!SlowFlags[idx] && !ErrorResponseFlags[idx]) {
+ goodDisks.push_back(idx);
+ }
+ }
+ if (goodDisks.size()) {
+ ui32 okVDiskIdx = RandomNumber(goodDisks.size());
+ subgroupIdx = goodDisks[okVDiskIdx];
+ }
+ }
+ }
+ if (!subgroupIdx) {
subgroupIdx = RandomNumber(Info->Type.TotalPartCount());
}
- TVDiskID vDisk = Info->GetVDiskInSubgroup(subgroupIdx, OriginalId.Hash());
+ TVDiskID vDisk = Info->GetVDiskInSubgroup(*subgroupIdx, OriginalId.Hash());
TDeque> events;
ui64 cookie = ((ui64)OriginalId.Hash() << 32) | PatchedId.Hash();
@@ -574,7 +652,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorPatchesWithFallback->Inc();
- if (WithMovingPatchRequestToStaticNode && UseVPatch && !IsSecured) {
+ if (WithMovingPatchRequestToStaticNode && UseVPatch && !IsSecured && !IsMovedPatch) {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA05, "Start Moved strategy from fallback");
StartMovedPatch();
} else {
@@ -587,20 +665,31 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorPickSubgroup(OriginalId.Hash(), &VDisks, nullptr);
ReceivedResponseFlags.assign(VDisks.size(), false);
ErrorResponseFlags.assign(VDisks.size(), false);
EmptyResponseFlags.assign(VDisks.size(), false);
ForceStopFlags.assign(VDisks.size(), false);
+ SlowFlags.assign(VDisks.size(), false);
+
+ ui64 worstNs = 0;
+ ui64 nextToWorstNs = 0;
+ i32 worstSubGroubIdx = -1;
+ GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::GetFastRead, &worstNs, &nextToWorstNs, &worstSubGroubIdx);
+ if (worstNs > nextToWorstNs * 2) {
+ SlowFlags[worstSubGroubIdx] = true;
+ HasSlowVDisk = true;
+ }
TDeque> events;
-
for (ui32 idx = 0; idx < VDisks.size(); ++idx) {
- std::unique_ptr ev = std::make_unique(
- OriginalId, PatchedId, VDisks[idx], Deadline, idx, true);
- events.emplace_back(std::move(ev));
- SentStarts++;
+ if (!SlowFlags[idx]) {
+ std::unique_ptr ev = std::make_unique(
+ OriginalId, PatchedId, VDisks[idx], Deadline, idx, true);
+ events.emplace_back(std::move(ev));
+ SentStarts++;
+ }
}
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA08, "Start VPatch strategy",
@@ -701,6 +790,17 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor nextToWorstNs * 2) {
+ SlowFlags[worstSubGroubIdx] = true;
+ HasSlowVDisk = true;
+ }
if (Info->Type.GetErasure() == TErasureType::ErasureMirror3dc) {
return ContinueVPatchForMirror3dc();
@@ -713,6 +813,9 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorPickSubgroup(OriginalId.Hash(), &VDisks, nullptr);
IsSecured = (Info->GetEncryptionMode() != TBlobStorageGroupInfo::EEM_NONE);
IsGoodPatchedBlobId = result;
IsAllowedErasure = Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock
|| Info->Type.GetErasure() == TErasureType::ErasureNone
|| Info->Type.GetErasure() == TErasureType::ErasureMirror3dc;
- if (IsGoodPatchedBlobId && IsAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID && !IsSecured) {
+ if (false && IsGoodPatchedBlobId && IsAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID && !IsSecured) {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA03, "Start VPatch strategy from bootstrap");
StartVPatch();
} else {
@@ -825,6 +930,75 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGetPredictedDelayNsByOrderNumber(diskIdx, queueId);;
+ if (predictedNs > *outWorstNs) {
+ *outNextToWorstNs = *outWorstNs;
+ *outWorstNs = predictedNs;
+ *outWorstSubgroupIdx = diskIdx;
+ } else if (predictedNs > *outNextToWorstNs) {
+ *outNextToWorstNs = predictedNs;
+ }
+ }
+ }
+
+ void SetSlowDisks() {
+ for (ui32 idx = 0; idx < SlowFlags.size(); ++idx) {
+ SlowFlags[idx] = !ReceivedResponseFlags[idx] && !EmptyResponseFlags[idx] && !ErrorResponseFlags[idx];
+ if (SlowFlags[idx]) {
+ HasSlowVDisk = true;
+ }
+ }
+ }
+
+ template
+ void HandleWakeUp(TEvents::TEvWakeup::TPtr &ev) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA36, "HandleWakeUp",
+ (ExpectedTag, ToString(ExpectedTag)),
+ (ReceivedTag, ToString(ev->Get()->Tag)));
+ if (ev->Get()->Tag == ExpectedTag) {
+ SetSlowDisks();
+ StartFallback();
+ }
+ if (ev->Get()->Tag == NeverTag) {
+ SetSlowDisks();
+ StartFallback();
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA40, "Found NeverTag wake up", (ExpectedTag, ToString(ExpectedTag)));
+ }
+ }
+
+ void HandleVPatchWakeUp(TEvents::TEvWakeup::TPtr &ev) {
+ ui64 expectedTag = (IsContinuedVPatch ? VPatchDiffTag : VPatchStartTag);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA37, "HandleWakeUp",
+ (ExpectedTag, ToString(expectedTag)),
+ (ReceivedTag, ToString(ev->Get()->Tag)));
+ if (ev->Get()->Tag == expectedTag) {
+ SetSlowDisks();
+ StartFallback();
+ }
+ if (ev->Get()->Tag == NeverTag) {
+ SetSlowDisks();
+ StartFallback();
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA41, "Found NeverTag wake up", (ExpectedTag, ToString(expectedTag)));
+ }
+ }
+
+ void HandleNeverTagWakeUp(TEvents::TEvWakeup::TPtr &ev) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA42, "HandleWakeUp",
+ (ExpectedTag, ToString(NeverTag)),
+ (ReceivedTag, ToString(ev->Get()->Tag)));
+ if (ev->Get()->Tag == NeverTag) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA43, "Found NeverTag wake up in naive state");
+ ReplyAndDie(NKikimrProto::DEADLINE);
+ }
+ }
+
STATEFN(NaiveState) {
if (ProcessEvent(ev)) {
return;
@@ -832,9 +1006,14 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvGetResult, Handle);
hFunc(TEvBlobStorage::TEvPutResult, Handle);
+
+ IgnoreFunc(TEvents::TEvWakeup);
+ //hFunc(TEvents::TEvWakeup, HandleWakeUp);
IgnoreFunc(TEvBlobStorage::TEvVPatchResult);
+ IgnoreFunc(TEvBlobStorage::TEvVPatchFoundParts);
+ IgnoreFunc(TEvBlobStorage::TEvVMovedPatchResult);
default:
- Y_ABORT("Received unknown event");
+ Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
};
}
@@ -844,9 +1023,11 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVMovedPatchResult, Handle);
+ hFunc(TEvents::TEvWakeup, HandleWakeUp);
IgnoreFunc(TEvBlobStorage::TEvVPatchResult);
+ IgnoreFunc(TEvBlobStorage::TEvVPatchFoundParts);
default:
- Y_ABORT("Received unknown event");
+ Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
};
}
@@ -857,8 +1038,9 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVPatchFoundParts, Handle);
hFunc(TEvBlobStorage::TEvVPatchResult, Handle);
+ hFunc(TEvents::TEvWakeup, HandleVPatchWakeUp);
default:
- Y_ABORT("Received unknown event");
+ Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
};
}
};
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
index d6b70cc6f9c2..741d5ce3b875 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
@@ -110,6 +110,18 @@ enum class ENaivePatchCase {
ErrorOnPut,
};
+#define CASE_TO_RETURN_STRING(cs) \
+ case cs: return #cs \
+// end CASE_TO_RETURN_STRING
+TString ToString(ENaivePatchCase cs) {
+ switch (cs) {
+ CASE_TO_RETURN_STRING(ENaivePatchCase::Ok);
+ CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnGetItem);
+ CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnGet);
+ CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnPut);
+ }
+}
+
NKikimrProto::EReplyStatus GetPatchResultStatus(ENaivePatchCase naiveCase) {
switch (naiveCase) {
case ENaivePatchCase::Ok:
@@ -156,6 +168,17 @@ enum class EVPatchCase {
Custom,
};
+TString ToString(EVPatchCase cs) {
+ switch (cs) {
+ CASE_TO_RETURN_STRING(EVPatchCase::Ok);
+ CASE_TO_RETURN_STRING(EVPatchCase::OneErrorAndAllPartExistInStart);
+ CASE_TO_RETURN_STRING(EVPatchCase::OnePartLostInStart);
+ CASE_TO_RETURN_STRING(EVPatchCase::DeadGroupInStart);
+ CASE_TO_RETURN_STRING(EVPatchCase::ErrorDuringVPatchDiff);
+ CASE_TO_RETURN_STRING(EVPatchCase::Custom);
+ }
+}
+
NKikimrProto::EReplyStatus GetPatchResultStatus(EVPatchCase vpatchCase) {
switch (vpatchCase) {
case EVPatchCase::Ok:
@@ -249,6 +272,15 @@ enum class EMovedPatchCase {
Error
};
+TString ToString(EMovedPatchCase cs) {
+ switch (cs) {
+ CASE_TO_RETURN_STRING(EMovedPatchCase::Ok);
+ CASE_TO_RETURN_STRING(EMovedPatchCase::Error);
+ }
+}
+
+#undef CASE_TO_RETURN_STRING
+
NKikimrProto::EReplyStatus GetPatchResultStatus(EMovedPatchCase movedCase) {
switch (movedCase) {
case EMovedPatchCase::Ok:
@@ -289,7 +321,7 @@ void ReceivePatchResult(TTestBasicRuntime &runtime, const TTestArgs &args, NKiki
}
void ConductGet(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
- CTEST << "ConductGet: Start\n";
+ CTEST << "ConductGet: Start NaiveCase: " << ToString(naiveCase) << "\n";
NKikimrProto::EReplyStatus resultStatus = GetGetResultStatus(naiveCase);
TAutoPtr handle;
TEvBlobStorage::TEvGet *get = runtime.GrabEdgeEventRethrow(handle);
@@ -328,10 +360,10 @@ TString MakePatchedBuffer(const TTestArgs &args) {
void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
NKikimrProto::EReplyStatus resultStatus = GetPutResultStatus(naiveCase);
if (resultStatus == NKikimrProto::UNKNOWN) {
- CTEST << "ConductPut: Skip\n";
+ CTEST << "ConductPut: Skip NaiveCase: " << ToString(naiveCase) << "\n";
return;
}
- CTEST << "ConductPut: Start\n";
+ CTEST << "ConductPut: Start NaiveCase: " << ToString(naiveCase) << "\n";
TAutoPtr handle;
TEvBlobStorage::TEvPut *put = runtime.GrabEdgeEventRethrow(handle);
UNIT_ASSERT_VALUES_EQUAL(put->Id, args.PatchedId);
@@ -346,7 +378,7 @@ void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCa
}
void ConductNaivePatch(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
- CTEST << "ConductNaivePatch: Start\n";
+ CTEST << "ConductNaivePatch: Start NaiveCase: " << ToString(naiveCase) << Endl;
ConductGet(runtime, args, naiveCase);
ConductPut(runtime, args, naiveCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(naiveCase);
@@ -354,14 +386,27 @@ void ConductNaivePatch(TTestBasicRuntime &runtime, const TTestArgs &args, ENaive
CTEST << "ConductNaivePatch: Finish\n";
}
+template
+TString ToString(const TVector &lst) {
+ TStringBuilder bld;
+ bld << '[';
+ for (ui32 idx = 0; idx < lst.size(); ++idx) {
+ if (idx) {
+ bld << ", ";
+ }
+ bld << lst[idx];
+ }
+ bld << ']';
+ return bld;
+}
void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
- EVPatchCase naiveCase, TVDiskPointer vdiskPointer)
+ EVPatchCase vpatchCase, TVDiskPointer vdiskPointer)
{
auto [vdiskIdx, idxInSubgroup] = vdiskPointer.GetIndecies(env, args.OriginalId.Hash());
- CTEST << "ConductVPatchStart: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
+ CTEST << "ConductVPatchStart: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";
TVDiskID vdisk = env.Info->GetVDiskInSubgroup(idxInSubgroup, args.OriginalId.Hash());
- auto [status, parts] = GetVPatchFoundPartsStatus(env, args, naiveCase, vdiskPointer);
+ auto [status, parts] = GetVPatchFoundPartsStatus(env, args, vpatchCase, vdiskPointer);
auto start = runtime.GrabEdgeEventRethrow({env.VDisks[vdiskIdx]});
auto &startRecord = start->Get()->Record;
@@ -376,21 +421,22 @@ void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, cons
for (auto partId : parts) {
foundParts->AddPart(partId);
}
+ CTEST << "ConductVPatchStart: Send FoundParts vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "parts# " << ToString(parts) << "\n";
SendByHandle(runtime, start, std::move(foundParts));
CTEST << "ConductVPatchStart: Finish vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
}
void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
- EVPatchCase naiveCase, TVDiskPointer vdiskPointer)
+ EVPatchCase vpatchCase, TVDiskPointer vdiskPointer)
{
auto [vdiskIdx, idxInSubgroup] = vdiskPointer.GetIndecies(env, args.PatchedId.Hash());
TVDiskID vdisk = env.Info->GetVDiskInSubgroup(idxInSubgroup, args.PatchedId.Hash());
- NKikimrProto::EReplyStatus resultStatus = GetVPatchResultStatus(env, args, naiveCase, vdiskPointer);
+ NKikimrProto::EReplyStatus resultStatus = GetVPatchResultStatus(env, args, vpatchCase, vdiskPointer);
if (resultStatus == NKikimrProto::UNKNOWN) {
- CTEST << "ConductVPatchDiff: Skip vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
+ CTEST << "ConductVPatchDiff: Skip vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";
return;
}
- CTEST << "ConductVPatchDiff: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
+ CTEST << "ConductVPatchDiff: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";
auto diffEv = runtime.GrabEdgeEventRethrow({env.VDisks[vdiskIdx]});
auto &diffRecord = diffEv->Get()->Record;
@@ -415,6 +461,7 @@ void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const
}
void ConductFailedVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args) {
+ return; // disabled vpatch
CTEST << "ConductFailedVPatch: Start\n";
for (ui32 idxInSubgroup = 0; idxInSubgroup < args.GType.BlobSubgroupSize(); ++idxInSubgroup) {
TVDiskPointer vdisk = TVDiskPointer::GetVDiskIdx(idxInSubgroup);
@@ -429,7 +476,7 @@ void ConductFailedVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, con
void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMovedPatchCase movedCase) {
- CTEST << "ConductVMovedPatch: Start\n";
+ CTEST << "ConductVMovedPatch: Start MovedPatchCase: " << ToString(movedCase) << Endl;
NKikimrProto::EReplyStatus resultStatus = GetVMovedPatchResultStatus(movedCase);
TAutoPtr handle;
TEvBlobStorage::TEvVMovedPatch *vPatch = runtime.GrabEdgeEventRethrow(handle);
@@ -459,7 +506,7 @@ void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMove
void ConductMovedPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EMovedPatchCase movedCase)
{
- CTEST << "ConductMovedPatch: Start\n";
+ CTEST << "ConductMovedPatch: Start MovedPatchCase: " << ToString(movedCase) << Endl;
ConductFailedVPatch(runtime, env, args);
ConductVMovedPatch(runtime, args, movedCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(movedCase);
@@ -481,7 +528,8 @@ void ConductFallbackPatch(TTestBasicRuntime &runtime, const TTestArgs &args) {
void ConductVPatchEvents(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase vpatchCase)
{
- CTEST << "ConductVPatchEvents: Start\n";
+ return; // disabled vpatch
+ CTEST << "ConductVPatchEvents: Start VPatchCase: " << ToString(vpatchCase) << Endl;
for (ui32 idxInSubgroup = 0; idxInSubgroup < args.GType.BlobSubgroupSize(); ++idxInSubgroup) {
TVDiskPointer vdisk = TVDiskPointer::GetVDiskIdx(idxInSubgroup);
ConductVPatchStart(runtime, env, args, vpatchCase, vdisk);
@@ -496,7 +544,7 @@ void ConductVPatchEvents(TTestBasicRuntime &runtime, const TDSProxyEnv &env, con
void ConductVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase vpatchCase)
{
- CTEST << "ConductFallbackPatch: Start\n";
+ CTEST << "ConductFallbackPatch: Start VPatchCase: " << ToString(vpatchCase) << Endl;
ConductVPatchEvents(runtime, env, args, vpatchCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(vpatchCase);
if (resultStatus == NKikimrProto::UNKNOWN) {
@@ -620,17 +668,18 @@ void RunGeneralTest(void(*runner)(TTestBasicRuntime &runtime, const TTestArgs &a
Y_UNIT_TEST_NAIVE(ErrorOnPut, erasure) \
Y_UNIT_TEST_MOVED(Ok, erasure) \
Y_UNIT_TEST_MOVED(Error, erasure) \
- Y_UNIT_TEST_VPATCH(Ok, erasure) \
- Y_UNIT_TEST_VPATCH(OneErrorAndAllPartExistInStart, erasure) \
- Y_UNIT_TEST_VPATCH(OnePartLostInStart, erasure) \
- Y_UNIT_TEST_VPATCH(DeadGroupInStart, erasure) \
- Y_UNIT_TEST_VPATCH(ErrorDuringVPatchDiff, erasure) \
Y_UNIT_TEST_SECURED(Ok, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnGetItem, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnGet, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnPut, erasure) \
// end Y_UNIT_TEST_PATCH_PACK
+// Y_UNIT_TEST_VPATCH(Ok, erasure)
+// Y_UNIT_TEST_VPATCH(OneErrorAndAllPartExistInStart, erasure)
+// Y_UNIT_TEST_VPATCH(OnePartLostInStart, erasure)
+// Y_UNIT_TEST_VPATCH(DeadGroupInStart, erasure)
+// Y_UNIT_TEST_VPATCH(ErrorDuringVPatchDiff, erasure)
+
Y_UNIT_TEST_PATCH_PACK(ErasureNone)
Y_UNIT_TEST_PATCH_PACK(Erasure4Plus2Block)
Y_UNIT_TEST_PATCH_PACK(ErasureMirror3dc)
@@ -712,6 +761,7 @@ EFaultToleranceCase GetFaultToleranceCaseForBlock4Plus2(const TDSProxyEnv &env,
}
}
}
+ return EFaultToleranceCase::Fallback; // disabled vpatch
if (layout.CountEffectiveReplicas(env.Info->Type) == env.Info->Type.TotalPartCount()) {
return EFaultToleranceCase::Ok;
} else {
@@ -736,6 +786,7 @@ EFaultToleranceCase GetFaultToleranceCaseForMirror3dc(const TDSProxyEnv &env, co
for (ui32 dcIdx = 0; dcIdx < dcCnt; ++dcIdx) {
x2cnt += (replInDc[dcIdx] >= 2);
}
+ return EFaultToleranceCase::Fallback; // disabled vpatch
if ((replInDc[0] && replInDc[1] && replInDc[2]) || x2cnt >= 2) {
return EFaultToleranceCase::Ok;
} else {
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index 754d0924d1b9..f2995a5b441f 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -179,6 +179,7 @@ void TNodeWarden::Bootstrap() {
icb->RegisterSharedControl(EnableSyncLogChunkCompressionSSD, "VDiskControls.EnableSyncLogChunkCompressionSSD");
icb->RegisterSharedControl(MaxSyncLogChunksInFlightHDD, "VDiskControls.MaxSyncLogChunksInFlightHDD");
icb->RegisterSharedControl(MaxSyncLogChunksInFlightSSD, "VDiskControls.MaxSyncLogChunksInFlightSSD");
+ icb->RegisterSharedControl(DefaultHugeGarbagePerMille, "VDiskControls.DefaultHugeGarbagePerMille");
icb->RegisterSharedControl(CostMetricsParametersByMedia[NPDisk::DEVICE_TYPE_ROT].BurstThresholdNs,
"VDiskControls.BurstThresholdNsHDD");
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index 3d77ae4f1c7e..3a141f3b113c 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -139,6 +139,7 @@ namespace NKikimr::NStorage {
TControlWrapper EnableSyncLogChunkCompressionSSD;
TControlWrapper MaxSyncLogChunksInFlightHDD;
TControlWrapper MaxSyncLogChunksInFlightSSD;
+ TControlWrapper DefaultHugeGarbagePerMille;
TReplQuoter::TPtr ReplNodeRequestQuoter;
TReplQuoter::TPtr ReplNodeResponseQuoter;
@@ -162,6 +163,7 @@ namespace NKikimr::NStorage {
, EnableSyncLogChunkCompressionSSD(0, 0, 1)
, MaxSyncLogChunksInFlightHDD(10, 1, 1024)
, MaxSyncLogChunksInFlightSSD(10, 1, 1024)
+ , DefaultHugeGarbagePerMille(300, 1, 1000)
, CostMetricsParametersByMedia({
TCostMetricsParameters{200},
TCostMetricsParameters{50},
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
index 0497406a1332..75061a6bcc90 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
@@ -179,6 +179,7 @@ namespace NKikimr::NStorage {
vdiskConfig->EnableVDiskCooldownTimeout = Cfg->EnableVDiskCooldownTimeout;
vdiskConfig->ReplPausedAtStart = Cfg->VDiskReplPausedAtStart;
vdiskConfig->EnableVPatch = EnableVPatch;
+ vdiskConfig->DefaultHugeGarbagePerMille = DefaultHugeGarbagePerMille;
vdiskConfig->EnableLocalSyncLogDataCutting = EnableLocalSyncLogDataCutting;
if (deviceType == NPDisk::EDeviceType::DEVICE_TYPE_ROT) {
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
index 7f2792c639de..75a8429bee87 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
@@ -401,16 +401,15 @@ class TRealBlockDevice : public IBlockDevice {
}
EndOffset = op->GetOffset() + opSize;
- ui64 duration = HPNow() - completionAction->SubmitTime;
- ui64 durationMs = HPMilliSecondsFloat(duration);
+ double duration = HPMilliSecondsFloat(HPNow() - completionAction->SubmitTime);
if (op->GetType() == IAsyncIoOperation::EType::PRead) {
NSan::Unpoison(op->GetData(), opSize);
REQUEST_VALGRIND_MAKE_MEM_DEFINED(op->GetData(), opSize);
- Device.Mon.DeviceReadDuration.Increment(durationMs);
- LWPROBE(PDiskDeviceReadDuration, Device.GetPDiskId(), HPMilliSecondsFloat(duration), opSize);
+ Device.Mon.DeviceReadDuration.Increment(duration);
+ LWPROBE(PDiskDeviceReadDuration, Device.GetPDiskId(), duration, opSize);
} else {
- Device.Mon.DeviceWriteDuration.Increment(durationMs);
- LWPROBE(PDiskDeviceWriteDuration, Device.GetPDiskId(), HPMilliSecondsFloat(duration), opSize);
+ Device.Mon.DeviceWriteDuration.Increment(duration);
+ LWPROBE(PDiskDeviceWriteDuration, Device.GetPDiskId(), duration, opSize);
}
if (completionAction->FlushAction) {
ui64 idx = completionAction->FlushAction->OperationIdx;
@@ -668,8 +667,8 @@ class TRealBlockDevice : public IBlockDevice {
Device.IsTrimEnabled = Device.IoContext->DoTrim(op);
NHPTimer::STime endTime = HPNow();
Device.IdleCounter.Decrement();
- const ui64 durationUs = HPMicroSeconds(endTime - beginTime);
- Device.Mon.DeviceTrimDuration.Increment(durationUs);
+ const double duration = HPMilliSecondsFloat(endTime - beginTime);
+ Device.Mon.DeviceTrimDuration.Increment(duration);
*Device.Mon.DeviceEstimatedCostNs += completion->CostNs;
if (Device.ActorSystem && Device.IsTrimEnabled) {
LOG_DEBUG_S(*Device.ActorSystem, NKikimrServices::BS_DEVICE,
@@ -680,7 +679,7 @@ class TRealBlockDevice : public IBlockDevice {
<< "\" offset# " << op->GetOffset()
<< " size# " << op->GetSize());
LWPROBE(PDiskDeviceTrimDuration, Device.GetPDiskId(),
- HPMilliSecondsFloat(endTime - beginTime), op->GetOffset());
+ duration, op->GetOffset());
}
}
completion->SetResult(EIoResult::Ok);
diff --git a/ydb/core/blobstorage/ut_blobstorage/gc.cpp b/ydb/core/blobstorage/ut_blobstorage/gc.cpp
new file mode 100644
index 000000000000..80c779093259
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/gc.cpp
@@ -0,0 +1,22 @@
+#include
+
+Y_UNIT_TEST_SUITE(GarbageCollection) {
+ Y_UNIT_TEST(EmptyGcCmd) {
+ TEnvironmentSetup env({
+ .Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
+ });
+ auto& runtime = env.Runtime;
+
+ env.CreateBoxAndPool(1, 1);
+ auto info = env.GetGroupInfo(env.GetGroups().front());
+
+ auto ev = std::make_unique(1u, 1u, 1u, 0u, false, 0u, 0u, nullptr, nullptr,
+ TInstant::Max(), true);
+ const TActorId edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
+ runtime->WrapInActorContext(edge, [&] {
+ SendToBSProxy(edge, info->GroupID, ev.release());
+ });
+ auto res = env.WaitForEdgeActorEvent(edge);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::ERROR);
+ }
+}
diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make
index 0edf9906ed95..ba965e9e83ca 100644
--- a/ydb/core/blobstorage/ut_blobstorage/ya.make
+++ b/ydb/core/blobstorage/ut_blobstorage/ya.make
@@ -22,6 +22,7 @@ SRCS(
ds_proxy_lwtrace.cpp
encryption.cpp
extra_block_checks.cpp
+ gc.cpp
gc_quorum_3dc.cpp
get.cpp
group_reconfiguration.cpp
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.h b/ydb/core/blobstorage/vdisk/common/vdisk_config.h
index 31e30788eeb1..73b1d27daaa1 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_config.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.h
@@ -218,6 +218,7 @@ namespace NKikimr {
TDuration WhiteboardUpdateInterval;
bool EnableVDiskCooldownTimeout;
TControlWrapper EnableVPatch = true;
+ TControlWrapper DefaultHugeGarbagePerMille;
///////////// COST METRICS SETTINGS ////////////////
bool UseCostTracker = true;
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index f31c02db3bcf..98473e08648b 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -1585,7 +1585,7 @@ namespace NKikimr {
if (deadline != TInstant::Max()) {
this->Record.MutableMsgQoS()->SetDeadlineSeconds((ui32)deadline.Seconds());
}
- this->Record.MutableMsgQoS()->SetExtQueueId(HandleClassToQueueId(NKikimrBlobStorage::AsyncBlob));
+ this->Record.MutableMsgQoS()->SetExtQueueId(NKikimrBlobStorage::PutAsyncBlob);
}
bool GetIgnoreBlock() const {
@@ -1965,6 +1965,25 @@ namespace NKikimr {
}
Record.MutableMsgQoS()->SetExtQueueId(NKikimrBlobStorage::EVDiskQueueId::GetFastRead);
}
+
+ TString ToString() const {
+ return ToString(this->Record);
+ }
+
+ static TString ToString(const NKikimrBlobStorage::TEvVPatchStart &record) {
+ TStringStream str;
+ TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId());
+ TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId());
+ str << "{TEvVPatchStart";
+ str << " OriginalBlobId# " << originalId.ToString();
+ str << " PatchedBlobId# " << patchedId.ToString();
+ if (record.HasMsgQoS()) {
+ str << " ";
+ TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
+ }
+ str << "}";
+ return str.Str();
+ }
};
struct TEvBlobStorage::TEvVPatchFoundParts
@@ -2010,6 +2029,25 @@ namespace NKikimr {
Record.SetStatus(status);
}
+ TString ToString() const {
+ return ToString(this->Record);
+ }
+
+ static TString ToString(const NKikimrBlobStorage::TEvVPatchFoundParts &record) {
+ TStringStream str;
+ TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId());
+ TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId());
+ str << "{TEvVPatchFoundParts";
+ str << " OriginalBlobId# " << originalId.ToString();
+ str << " PatchedBlobId# " << patchedId.ToString();
+ if (record.HasMsgQoS()) {
+ str << " ";
+ TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
+ }
+ str << "}";
+ return str.Str();
+ }
+
void MakeError(NKikimrProto::EReplyStatus status, const TString& errorReason,
const NKikimrBlobStorage::TEvVPatchStart &request) {
Record.SetErrorReason(errorReason);
@@ -2099,6 +2137,25 @@ namespace NKikimr {
}
return result;
}
+
+ TString ToString() const {
+ return ToString(this->Record);
+ }
+
+ static TString ToString(const NKikimrBlobStorage::TEvVPatchDiff &record) {
+ TStringStream str;
+ TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalPartBlobId());
+ TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId());
+ str << "{TEvVPatchDiff";
+ str << " OriginalBlobId# " << originalId.ToString();
+ str << " PatchedBlobId# " << patchedId.ToString();
+ if (record.HasMsgQoS()) {
+ str << " ";
+ TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
+ }
+ str << "}";
+ return str.Str();
+ }
};
@@ -2144,6 +2201,25 @@ namespace NKikimr {
}
return result;
}
+
+ TString ToString() const {
+ return ToString(this->Record);
+ }
+
+ static TString ToString(const NKikimrBlobStorage::TEvVPatchXorDiff &record) {
+ TStringStream str;
+ TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalPartBlobId());
+ TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId());
+ str << "{TEvVPatchXorDiff";
+ str << " OriginalBlobId# " << originalId.ToString();
+ str << " PatchedBlobId# " << patchedId.ToString();
+ if (record.HasMsgQoS()) {
+ str << " ";
+ TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
+ }
+ str << "}";
+ return str.Str();
+ }
};
struct TEvBlobStorage::TEvVPatchXorDiffResult
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
index fa7014b64837..54a7d2cfaf53 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
@@ -16,12 +16,14 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
TDefragCtx::TDefragCtx(
const TIntrusivePtr &vctx,
+ const TIntrusivePtr &vconfig,
const std::shared_ptr &hugeBlobCtx,
const TPDiskCtxPtr &pdiskCtx,
const TActorId &skeletonId,
const TActorId &hugeKeeperId,
bool runDefrageBySchedule)
: VCtx(vctx)
+ , VCfg(vconfig)
, HugeBlobCtx(hugeBlobCtx)
, PDiskCtx(pdiskCtx)
, SkeletonId(skeletonId)
@@ -48,7 +50,8 @@ namespace NKikimr {
bool HugeHeapDefragmentationRequired(
const TOutOfSpaceState& oos,
ui32 hugeCanBeFreedChunks,
- ui32 hugeTotalChunks) {
+ ui32 hugeTotalChunks,
+ double defaultPercent) {
if (hugeCanBeFreedChunks < 10)
return false;
@@ -56,11 +59,14 @@ namespace NKikimr {
double percentOfGarbage = static_cast(hugeCanBeFreedChunks) / hugeTotalChunks;
if (oos.GetLocalColor() > TSpaceColor::CYAN) {
- return percentOfGarbage >= 0.02;
+ // For anything worse than CYAN
+ return percentOfGarbage >= Min(0.02, defaultPercent);
} else if (oos.GetLocalColor() > TSpaceColor::GREEN) {
- return percentOfGarbage >= 0.15;
+ // For CYAN
+ return percentOfGarbage >= Min(0.15, defaultPercent);
} else {
- return percentOfGarbage >= 0.30;
+ // For GREEN
+ return percentOfGarbage >= Min(0.30, defaultPercent);
}
}
@@ -113,7 +119,8 @@ namespace NKikimr {
const auto& oos = DCtx->VCtx->GetOutOfSpaceState();
Y_ABORT_UNLESS(usefulChunks <= totalChunks);
const ui32 canBeFreedChunks = totalChunks - usefulChunks;
- if (HugeHeapDefragmentationRequired(oos, canBeFreedChunks, totalChunks)) {
+ double defaultPercent = DCtx->VCfg->DefaultHugeGarbagePerMille / 1000.0;
+ if (HugeHeapDefragmentationRequired(oos, canBeFreedChunks, totalChunks, defaultPercent)) {
TChunksToDefrag chunksToDefrag = calcStat.GetChunksToDefrag(DCtx->MaxChunksToDefrag);
Y_ABORT_UNLESS(chunksToDefrag);
STLOG(PRI_INFO, BS_VDISK_DEFRAG, BSVDD03, VDISKP(DCtx->VCtx->VDiskLogPrefix, "scan finished"),
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.h b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.h
index f59ecee374c6..08c451b094e7 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.h
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.h
@@ -18,6 +18,7 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
struct TDefragCtx {
const TIntrusivePtr VCtx;
+ const TIntrusivePtr VCfg;
const std::shared_ptr HugeBlobCtx;
const TPDiskCtxPtr PDiskCtx;
const TActorId SkeletonId;
@@ -30,6 +31,7 @@ namespace NKikimr {
TDefragCtx(
const TIntrusivePtr &vctx,
+ const TIntrusivePtr &vconfig,
const std::shared_ptr &hugeBlobCtx,
const TPDiskCtxPtr &pdiskCtx,
const TActorId &skeletonId,
@@ -45,7 +47,8 @@ namespace NKikimr {
bool HugeHeapDefragmentationRequired(
const TOutOfSpaceState& oos,
ui32 hugeCanBeFreedChunks,
- ui32 hugeTotalChunks);
+ ui32 hugeTotalChunks,
+ double defaultPercent);
////////////////////////////////////////////////////////////////////////////
// VDISK DEFRAG ACTOR CREATOR
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_actor_ut.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_actor_ut.cpp
index ebcfec56195b..fa059a1d2c8e 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_actor_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_actor_ut.cpp
@@ -15,21 +15,21 @@ namespace NKikimr {
TOutOfSpaceState oos(1, 0);
ui32 hugeCanBeFreedChunks = 9;
ui32 hugeUsedChunks = 20;
- bool defrag = HugeHeapDefragmentationRequired(oos, hugeCanBeFreedChunks, hugeUsedChunks);
+ bool defrag = HugeHeapDefragmentationRequired(oos, hugeCanBeFreedChunks, hugeUsedChunks, 0.30);
UNIT_ASSERT(!defrag);
}
{
TOutOfSpaceState oos(1, 0);
ui32 hugeCanBeFreedChunks = 200;
ui32 hugeUsedChunks = 1000;
- bool defrag = HugeHeapDefragmentationRequired(oos, hugeCanBeFreedChunks, hugeUsedChunks);
+ bool defrag = HugeHeapDefragmentationRequired(oos, hugeCanBeFreedChunks, hugeUsedChunks, 0.30);
UNIT_ASSERT(!defrag);
}
{
TOutOfSpaceState oos(1, 0);
ui32 hugeCanBeFreedChunks = 301;
ui32 hugeUsedChunks = 1000;
- bool defrag = HugeHeapDefragmentationRequired(oos, hugeCanBeFreedChunks, hugeUsedChunks);
+ bool defrag = HugeHeapDefragmentationRequired(oos, hugeCanBeFreedChunks, hugeUsedChunks, 0.30);
UNIT_ASSERT(defrag);
}
}
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
index 1e1753bf6a99..9f3cb4006e1b 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
@@ -95,7 +95,7 @@ namespace NKikimr {
Compact();
auto hugeStat = GetHugeStat();
- Y_ABORT_UNLESS(hugeStat.LockedChunks.size() < 100);
+ Y_DEBUG_ABORT_UNLESS(hugeStat.LockedChunks.size() < 100);
}
Send(ParentActorId, new TEvDefragQuantumResult(std::move(stat)));
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
index fedaca042103..a1eeb6af94fb 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
@@ -413,6 +413,13 @@ namespace NKikimr {
if (!CheckGC(ctx, record))
return {NKikimrProto::ERROR, 0, false}; // record has duplicates
+ if (!collect && !record.KeepSize() && !record.DoNotKeepSize()) {
+ LOG_ERROR_S(ctx, NKikimrServices::BS_HULLRECS, HullDs->HullCtx->VCtx->VDiskLogPrefix
+ << "Db# Barriers ValidateGCCmd: empty garbage collection command"
+ << " TabletId# " << tabletID);
+ return {NKikimrProto::ERROR, "empty garbage collection command"};
+ }
+
auto blockStatus = THullDbRecovery::IsBlocked(record);
switch (blockStatus.Status) {
case TBlocksCache::EStatus::OK:
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
index bdec548c93e9..45f7f4250a02 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
@@ -425,8 +425,8 @@ namespace NKikimr {
// no more blobs to replicate; replication will not resume
State = Finished;
ReplCtx->MonGroup.ReplUnreplicatedVDisks() = 0;
- ReplCtx->MonGroup.ReplUnreplicatedPhantoms() = 1;
- ReplCtx->MonGroup.ReplUnreplicatedNonPhantoms() = 1;
+ ReplCtx->MonGroup.ReplUnreplicatedPhantoms() = 0;
+ ReplCtx->MonGroup.ReplUnreplicatedNonPhantoms() = 0;
ReplCtx->MonGroup.ReplWorkUnitsRemaining() = 0;
ReplCtx->MonGroup.ReplWorkUnitsDone() = 0;
ReplCtx->MonGroup.ReplItemsRemaining() = 0;
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index 043b5300e308..c7a7ff970a4c 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -225,12 +225,19 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////
void Handle(TEvBlobStorage::TEvVMovedPatch::TPtr &ev, const TActorContext &ctx) {
+ LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVMovedPatch: receive request;"
+ << " Event# " << ev->Get()->ToString());
if (!CheckIfWriteAllowed(ev, ctx)) {
+ LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVMovedPatch: is not allowed;"
+ << " Event# " << ev->Get()->ToString());
return;
}
const bool postpone = OverloadHandler->PostponeEvent(ev);
if (!postpone) {
PrivateHandle(ev, ctx);
+ } else {
+ LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVMovedPatch: is postponned;"
+ << " Event# " << ev->Get()->ToString());
}
}
@@ -270,11 +277,16 @@ namespace NKikimr {
void Handle(TEvBlobStorage::TEvVPatchStart::TPtr &ev, const TActorContext &ctx) {
if (!CheckIfWriteAllowed(ev, ctx)) {
+ LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatchStart: receive request;"
+ << " Event# " << ev->Get()->ToString());
return;
}
const bool postpone = OverloadHandler->PostponeEvent(ev);
if (!postpone) {
PrivateHandle(ev, ctx);
+ } else {
+ LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatchStart: postponned;"
+ << " Event# " << ev->Get()->ToString());
}
}
@@ -307,24 +319,32 @@ namespace NKikimr {
template
void HandleVPatchDiffResending(TEvDiffPtr &ev, const TActorContext &ctx) {
if (!CheckIfWriteAllowed(ev, ctx)) {
+ LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: is not allowed;"
+ << " Event# " << ev->Get()->ToString());
return;
}
if constexpr (std::is_same_v) {
LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: recieve diff;"
<< " Event# " << ev->Get()->ToString());
IFaceMonGroup->PatchDiffMsgs()++;
- }
- if constexpr (std::is_same_v) {
+ } else if constexpr (std::is_same_v) {
LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: recieve xor diff;"
<< " Event# " << ev->Get()->ToString());
IFaceMonGroup->PatchXorDiffMsgs()++;
+ } else {
+ LOG_ERROR_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: UNKNOWN diff;"
+ << " Event# " << ev->Get()->ToString());
}
TLogoBlobID patchedBlobId = LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetPatchedPartBlobId()).FullID();
auto it = VPatchActors.find(patchedBlobId);
if (it != VPatchActors.end()) {
TActivationContext::Send(ev->Forward(it->second));
+ LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: diff sent to actor;"
+ << " Event# " << ev->Get()->ToString());
} else {
ReplyError(NKikimrProto::ERROR, "VPatchActor doesn't exist", ev, ctx, TAppData::TimeProvider->Now());
+ LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: diff didn't send to actor; actor didn't exist"
+ << " Event# " << ev->Get()->ToString());
}
}
@@ -1789,7 +1809,7 @@ namespace NKikimr {
}
void StartDefrag(const TActorContext &ctx) {
- auto defragCtx = std::make_shared(VCtx, HugeBlobCtx, PDiskCtx, ctx.SelfID,
+ auto defragCtx = std::make_shared(VCtx, Config, HugeBlobCtx, PDiskCtx, ctx.SelfID,
Db->HugeKeeperID, true);
DefragId = ctx.Register(CreateDefragActor(defragCtx, GInfo));
ActiveActors.Insert(DefragId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); // keep forever
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
index db94713be486..a0b22acc05a0 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
@@ -9,8 +9,6 @@ namespace NKikimr {
class TVMovedPatchActor : public TActorBootstrapped {
friend TActorBootstrapped;
- static constexpr ui64 SubRequestDurationMs = 1000;
-
ui32 OriginalGroupId;
ui32 PatchedGroupId;
TLogoBlobID OriginalId;
@@ -29,6 +27,7 @@ namespace NKikimr {
TActorId LeaderId;
TOutOfSpaceStatus OOSStatus;
+ TInstant Deadline = TInstant::Zero();
NLWTrace::TOrbit Orbit;
@@ -58,6 +57,10 @@ namespace NKikimr {
OriginalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId());
Y_ABORT_UNLESS(record.HasPatchedBlobId());
PatchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId());
+ Deadline = TInstant::Seconds(record.GetMsgQoS().HasDeadlineSeconds());
+ if (record.HasMsgQoS() && record.GetMsgQoS().HasDeadlineSeconds()) {
+ Deadline = TInstant::Seconds(record.GetMsgQoS().HasDeadlineSeconds());
+ }
DiffCount = record.DiffsSize();
Diffs.reset(new TEvBlobStorage::TEvPatch::TDiff[DiffCount]);
@@ -96,6 +99,12 @@ namespace NKikimr {
<< " ErrorReason# " << ErrorReason
<< " Marker# BSVSP01");
}
+ LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix
+ << "Send result TEvVMovedPatch: " << errorSubMsg << ';'
+ << " OriginalBlobId# " << OriginalId
+ << " PatchedBlobId# " << PatchedId
+ << " ErrorReason# " << ErrorReason
+ << " Marker# BSVSP01");
SendVDiskResponse(ctx, Event->Sender, vMovedPatchResult.release(), Event->Cookie, VCtx);
PassAway();
}
@@ -108,6 +117,10 @@ namespace NKikimr {
}
void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev, const TActorContext &ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix
+ << "Receive Get ub TEvVMovedPatch: "
+ << " OriginalBlobId# " << OriginalId
+ << " PatchedBlobId# " << PatchedId);
TEvBlobStorage::TEvGetResult *result = ev->Get();
Orbit = std::move(result->Orbit);
@@ -138,15 +151,18 @@ namespace NKikimr {
Buffer = result->Responses[0].Buffer.ConvertToString();
ApplyDiffs();
- TInstant deadline = TActivationContext::Now() + TDuration::MilliSeconds(SubRequestDurationMs);
// We have chosen UserData as PutHandleClass on purpose.
// If VMovedPatch and Put were AsyncWrite, it would become a deadlock
// because the put subrequest may not send and the moved patch request will end by timeout.
- std::unique_ptr put = std::make_unique(PatchedId, Buffer, deadline,
+ std::unique_ptr put = std::make_unique(PatchedId, Buffer, Deadline,
NKikimrBlobStorage::UserData, TEvBlobStorage::TEvPut::TacticDefault);
put->Orbit = std::move(Orbit);
+ LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix
+ << "Send Put ub TEvVMovedPatch: "
+ << " OriginalBlobId# " << OriginalId
+ << " PatchedBlobId# " << PatchedId);
SendToBSProxy(SelfId(), PatchedGroupId, put.release(), OriginalId.Hash());
}
@@ -156,6 +172,11 @@ namespace NKikimr {
ui32 originalIdHash = OriginalId.Hash();
+ LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix
+ << "Receive Put ub TEvVMovedPatch: "
+ << " OriginalBlobId# " << OriginalId
+ << " PatchedBlobId# " << PatchedId);
+
constexpr auto errorSubMsg = "failed on VPut";
if (ev->Cookie != originalIdHash) {
ErrorReason = "Couldn't put the patched blob; Received TEvPutResult with wrong cookie";
@@ -173,11 +194,20 @@ namespace NKikimr {
}
void Bootstrap() {
- TInstant deadline = TActivationContext::Now() + TDuration::MilliSeconds(SubRequestDurationMs);
+ if (Deadline && Deadline < TActivationContext::Now()) {
+ SendResponseAndDie(TActivationContext::AsActorContext(), NKikimrProto::DEADLINE);
+ return;
+ }
+
std::unique_ptr get = std::make_unique(OriginalId, 0,
- OriginalId.BlobSize(), deadline, NKikimrBlobStorage::AsyncRead);
+ OriginalId.BlobSize(), Deadline, NKikimrBlobStorage::AsyncRead);
get->Orbit = std::move(Event->Get()->Orbit);
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix
+ << "Send Get ub TEvVMovedPatch: "
+ << " OriginalBlobId# " << OriginalId
+ << " PatchedBlobId# " << PatchedId);
+
SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash());
Become(&TThis::StateWait);
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
index 5c4325e85ced..05f6f035277a 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
@@ -306,7 +306,7 @@ namespace NKikimr::NPrivate {
void SendVPatchResult(NKikimrProto::EReplyStatus status, bool forceEnd = false)
{
STLOG(PRI_INFO, BS_VDISK_PATCH, BSVSP07,
- VDiskLogPrefix << " TEvVPatch: send patch result;",
+ VDiskLogPrefix << " TEvVPatch: " << (forceEnd ? "received force end;" : "send patch result;"),
(OriginalBlobId, OriginalBlobId),
(PatchedBlobId, PatchedBlobId),
(OriginalPartId, (ui32)OriginalPartId),
diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h
index 8c9f45a1c698..823b208f5323 100644
--- a/ydb/core/change_exchange/change_sender_common_ops.h
+++ b/ydb/core/change_exchange/change_sender_common_ops.h
@@ -336,7 +336,7 @@ class TBaseChangeSender {
Y_ABORT_UNLESS(it != Broadcasting.end());
auto& broadcast = it->second;
- if (broadcast.Partitions.contains(partitionId)) {
+ if (broadcast.CompletedPartitions.contains(partitionId)) {
return false;
}
diff --git a/ydb/core/config/init/init.cpp b/ydb/core/config/init/init.cpp
index 9890ccabc480..058ef1d3af3e 100644
--- a/ydb/core/config/init/init.cpp
+++ b/ydb/core/config/init/init.cpp
@@ -48,8 +48,8 @@ class TDefaultProtoConfigFileProvider
static bool IsFileReadable(const fs::path& p) {
std::error_code ec; // For noexcept overload usage.
auto perms = fs::status(p, ec).permissions();
- if ((perms & fs::perms::owner_read) != fs::perms::none &&
- (perms & fs::perms::group_read) != fs::perms::none &&
+ if ((perms & fs::perms::owner_read) != fs::perms::none ||
+ (perms & fs::perms::group_read) != fs::perms::none ||
(perms & fs::perms::others_read) != fs::perms::none )
{
return true;
diff --git a/ydb/core/config/init/init_impl.h b/ydb/core/config/init/init_impl.h
index c2a8a9fee2fd..9da49a0e5584 100644
--- a/ydb/core/config/init/init_impl.h
+++ b/ydb/core/config/init/init_impl.h
@@ -1119,7 +1119,7 @@ class TInitialConfiguratorImpl
}
void FillData(const NConfig::TCommonAppOptions& cf) {
- if (cf.TenantName && ScopeId.IsEmpty()) {
+ if (!cf.TenantName && ScopeId.IsEmpty()) {
const TString myDomain = DeduceNodeDomain(cf, AppConfig);
for (const auto& domain : AppConfig.GetDomainsConfig().GetDomain()) {
if (domain.GetName() == myDomain) {
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index f482af16e987..53a4ee27983c 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2149,7 +2149,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
// Create resource manager
auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr,
- {}, kqpProxySharedResources);
+ {}, kqpProxySharedResources, NodeId);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpRmServiceID(NodeId),
TActorSetupCmd(rm, TMailboxType::HTSwap, appData->UserPoolId)));
diff --git a/ydb/core/driver_lib/version/version.cpp b/ydb/core/driver_lib/version/version.cpp
index adc22dfeeca6..85effd8a57d0 100644
--- a/ydb/core/driver_lib/version/version.cpp
+++ b/ydb/core/driver_lib/version/version.cpp
@@ -26,7 +26,8 @@ TCompatibilityInfo::TCompatibilityInfo() {
auto current = MakeCurrent();
- // bool success = CompleteFromTag(current);
+ bool success = CompleteFromTag(current);
+ Y_UNUSED(success);
// Y_ABORT_UNLESS(success);
CurrentCompatibilityInfo.CopyFrom(current);
@@ -72,12 +73,13 @@ const TStored* TCompatibilityInfo::GetDefault(TComponentId componentId) const {
// obsolete version control
TMaybe VERSION = NActors::TInterconnectProxyCommon::TVersionInfo{
// version of this binary
- "trunk",
+ "stable-24-3",
// compatible versions; must include all compatible old ones, including this one; version verification occurs on both
// peers and connection is accepted if at least one of peers accepts the version of the other peer
{
- "trunk"
+ "stable-24-2",
+ "stable-24-3"
}
};
diff --git a/ydb/core/engine/mkql_keys.cpp b/ydb/core/engine/mkql_keys.cpp
index 93d70dcfcbb3..d282ccf5f40f 100644
--- a/ydb/core/engine/mkql_keys.cpp
+++ b/ydb/core/engine/mkql_keys.cpp
@@ -51,14 +51,6 @@ NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType *type, bool &isOption
}
}
-
-template
-TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
- static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
- const auto v = value.Get();
- return TCell(reinterpret_cast(&v), sizeof(v));
-}
-
THolder ExtractKeyTuple(const TTableId& tableId, TTupleLiteral* tuple,
const TVector& columns,
TKeyDesc::ERowOperation rowOperation, bool requireStaticKey, const TTypeEnvironment& env) {
diff --git a/ydb/core/engine/mkql_keys.h b/ydb/core/engine/mkql_keys.h
index 517120748a77..b51e789d0182 100644
--- a/ydb/core/engine/mkql_keys.h
+++ b/ydb/core/engine/mkql_keys.h
@@ -45,6 +45,13 @@ THolder ExtractTableKey(TCallable& callable, const TTableStrings& stri
TVector> ExtractTableKeys(TExploringNodeVisitor& explorer, const TTypeEnvironment& env);
TTableId ExtractTableId(const TRuntimeNode& node);
+template
+TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
+ static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
+ const auto v = value.Get();
+ return TCell(reinterpret_cast(&v), sizeof(v));
+}
+
TCell MakeCell(NScheme::TTypeInfo type, const NUdf::TUnboxedValuePod& value,
const TTypeEnvironment& env, bool copy = true,
i32 typmod = -1, TMaybe* error = {});
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 2ee5626e5d40..70ed133e6366 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -548,7 +548,7 @@ std::shared_ptr DefaultScalar(const std::shared_ptrToString());
return out;
}
@@ -634,6 +634,19 @@ int ScalarCompare(const std::shared_ptr& x, const std::shared_ptr
return ScalarCompare(*x, *y);
}
+int ScalarCompareNullable(const std::shared_ptr& x, const std::shared_ptr& y) {
+ if (!x && !!y) {
+ return -1;
+ }
+ if (!!x && !y) {
+ return 1;
+ }
+ if (!x && !y) {
+ return 0;
+ }
+ return ScalarCompare(*x, *y);
+}
+
std::shared_ptr SortBatch(const std::shared_ptr& batch,
const std::shared_ptr& sortingKey, const bool andUnique) {
auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);
@@ -892,24 +905,33 @@ std::shared_ptr MergeColumns(const std::vector> SliceToRecordBatches(const std::shared_ptr& t) {
- std::set splitPositions;
- const ui32 numRows = t->num_rows();
- for (auto&& i : t->columns()) {
- ui32 pos = 0;
- for (auto&& arr : i->chunks()) {
- splitPositions.emplace(pos);
- pos += arr->length();
+ if (!t->num_rows()) {
+ return {};
+ }
+ std::vector positions;
+ {
+ for (auto&& i : t->columns()) {
+ ui32 pos = 0;
+ for (auto&& arr : i->chunks()) {
+ positions.emplace_back(pos);
+ pos += arr->length();
+ }
+ AFL_VERIFY(pos == t->num_rows());
}
- AFL_VERIFY(pos == t->num_rows());
+ positions.emplace_back(t->num_rows());
}
+ std::sort(positions.begin(), positions.end());
+ positions.erase(std::unique(positions.begin(), positions.end()), positions.end());
+
std::vector>> slicedData;
- slicedData.resize(splitPositions.size());
- std::vector positions(splitPositions.begin(), splitPositions.end());
- for (auto&& i : t->columns()) {
- for (ui32 idx = 0; idx < positions.size(); ++idx) {
- auto slice = i->Slice(positions[idx], ((idx + 1 == positions.size()) ? numRows : positions[idx + 1]) - positions[idx]);
- AFL_VERIFY(slice->num_chunks() == 1);
- slicedData[idx].emplace_back(slice->chunks().front());
+ slicedData.resize(positions.size() - 1);
+ {
+ for (auto&& i : t->columns()) {
+ for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) {
+ auto slice = i->Slice(positions[idx], positions[idx + 1] - positions[idx]);
+ AFL_VERIFY(slice->num_chunks() == 1);
+ slicedData[idx].emplace_back(slice->chunks().front());
+ }
}
}
std::vector> result;
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index f6f4fd0c18a0..584803598daf 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -98,6 +98,7 @@ std::shared_ptr GetScalar(const std::shared_ptr& ar
bool IsGoodScalar(const std::shared_ptr& x);
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
int ScalarCompare(const std::shared_ptr& x, const std::shared_ptr& y);
+int ScalarCompareNullable(const std::shared_ptr& x, const std::shared_ptr& y);
std::partial_ordering ColumnsCompare(const std::vector>& x, const ui32 xRow, const std::vector>& y, const ui32 yRow);
bool ScalarLess(const std::shared_ptr& x, const std::shared_ptr& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
diff --git a/ydb/core/formats/arrow/common/accessor.cpp b/ydb/core/formats/arrow/common/accessor.cpp
index 9865b2a692f7..775cffa95bab 100644
--- a/ydb/core/formats/arrow/common/accessor.cpp
+++ b/ydb/core/formats/arrow/common/accessor.cpp
@@ -1,4 +1,5 @@
#include "accessor.h"
+#include
#include
#include
#include
@@ -94,6 +95,10 @@ class TChunkAccessor {
}
+std::optional TTrivialArray::DoGetRawSize() const {
+ return NArrow::GetArrayDataSize(Array);
+}
+
std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
AFL_VERIFY(StartPosition <= position);
AFL_VERIFY(position < FinishPosition);
@@ -119,4 +124,12 @@ IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::
return SelectChunk(chunkCurrent, position, accessor);
}
+std::optional TTrivialChunkedArray::DoGetRawSize() const {
+ ui64 result = 0;
+ for (auto&& i : Array->chunks()) {
+ result += NArrow::GetArrayDataSize(i);
+ }
+ return result;
+}
+
}
diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h
index 3765d726992b..6021f47f5a88 100644
--- a/ydb/core/formats/arrow/common/accessor.h
+++ b/ydb/core/formats/arrow/common/accessor.h
@@ -84,19 +84,23 @@ class IChunkedArray {
YDB_READONLY_DEF(std::shared_ptr, DataType);
YDB_READONLY(ui64, RecordsCount, 0);
YDB_READONLY(EType, Type, EType::Undefined);
+ virtual std::optional DoGetRawSize() const = 0;
protected:
virtual std::shared_ptr DoGetChunkedArray() const = 0;
virtual TCurrentChunkAddress DoGetChunk(const std::optional& chunkCurrent, const ui64 position) const = 0;
template
TCurrentChunkAddress SelectChunk(const std::optional& chunkCurrent, const ui64 position, const TChunkAccessor& accessor) const {
- if (!chunkCurrent || position >= chunkCurrent->GetStartPosition() + chunkCurrent->GetLength()) {
+ if (!chunkCurrent || position >= chunkCurrent->GetStartPosition()) {
ui32 startIndex = 0;
ui64 idx = 0;
if (chunkCurrent) {
- AFL_VERIFY(chunkCurrent->GetChunkIndex() + 1 < accessor.GetChunksCount());
- startIndex = chunkCurrent->GetChunkIndex() + 1;
- idx = chunkCurrent->GetStartPosition() + chunkCurrent->GetLength();
+ if (position < chunkCurrent->GetFinishPosition()) {
+ return *chunkCurrent;
+ }
+ AFL_VERIFY(chunkCurrent->GetChunkIndex() < accessor.GetChunksCount());
+ startIndex = chunkCurrent->GetChunkIndex();
+ idx = chunkCurrent->GetStartPosition();
}
for (ui32 i = startIndex; i < accessor.GetChunksCount(); ++i) {
const ui64 nextIdx = idx + accessor.GetChunkLength(i);
@@ -105,7 +109,7 @@ class IChunkedArray {
}
idx = nextIdx;
}
- } else if (position < chunkCurrent->GetStartPosition()) {
+ } else {
AFL_VERIFY(chunkCurrent->GetChunkIndex() > 0);
ui64 idx = chunkCurrent->GetStartPosition();
for (i32 i = chunkCurrent->GetChunkIndex() - 1; i >= 0; --i) {
@@ -156,6 +160,10 @@ class IChunkedArray {
TString DebugString(const ui32 position) const;
};
+ std::optional GetRawSize() const {
+ return DoGetRawSize();
+ }
+
std::shared_ptr GetChunkedArray() const {
return DoGetChunkedArray();
}
@@ -180,6 +188,8 @@ class TTrivialArray: public IChunkedArray {
using TBase = IChunkedArray;
const std::shared_ptr Array;
protected:
+ virtual std::optional DoGetRawSize() const override;
+
virtual TCurrentChunkAddress DoGetChunk(const std::optional& /*chunkCurrent*/, const ui64 /*position*/) const override {
return TCurrentChunkAddress(Array, 0, 0);
}
@@ -204,6 +214,7 @@ class TTrivialChunkedArray: public IChunkedArray {
virtual std::shared_ptr DoGetChunkedArray() const override {
return Array;
}
+ virtual std::optional DoGetRawSize() const override;
public:
TTrivialChunkedArray(const std::shared_ptr& data)
diff --git a/ydb/core/formats/arrow/common/adapter.h b/ydb/core/formats/arrow/common/adapter.h
index 543e78511146..1b368e38de50 100644
--- a/ydb/core/formats/arrow/common/adapter.h
+++ b/ydb/core/formats/arrow/common/adapter.h
@@ -91,7 +91,7 @@ class TDataBuilderPolicy {
return batch;
}
[[nodiscard]] static std::shared_ptr ApplyArrowFilter(const std::shared_ptr& batch, const std::shared_ptr& filter) {
- auto table = batch->BuildTable();
+ auto table = batch->BuildTableVerified();
return std::make_shared(TDataBuilderPolicy::ApplyArrowFilter(table, filter));
}
[[nodiscard]] static std::shared_ptr GetEmptySame(const std::shared_ptr& batch) {
diff --git a/ydb/core/formats/arrow/common/container.cpp b/ydb/core/formats/arrow/common/container.cpp
index ccf8dc71fb0c..844e8d3343cf 100644
--- a/ydb/core/formats/arrow/common/container.cpp
+++ b/ydb/core/formats/arrow/common/container.cpp
@@ -1,50 +1,60 @@
#include "container.h"
#include
+#include
#include
namespace NKikimr::NArrow {
-NKikimr::TConclusionStatus TGeneralContainer::MergeColumnsStrictly(const TGeneralContainer& container) {
- if (RecordsCount != container.RecordsCount) {
+TConclusionStatus TGeneralContainer::MergeColumnsStrictly(const TGeneralContainer& container) {
+ if (!container.RecordsCount) {
+ return TConclusionStatus::Success();
+ }
+ if (!RecordsCount) {
+ RecordsCount = container.RecordsCount;
+ }
+ if (*RecordsCount != *container.RecordsCount) {
return TConclusionStatus::Fail(TStringBuilder() << "inconsistency records count in additional container: " <<
container.GetSchema()->ToString() << ". expected: " << RecordsCount << ", reality: " << container.GetRecordsCount());
}
for (i32 i = 0; i < container.Schema->num_fields(); ++i) {
auto addFieldResult = AddField(container.Schema->field(i), container.Columns[i]);
- if (!addFieldResult) {
+ if (addFieldResult.IsFail()) {
return addFieldResult;
}
}
return TConclusionStatus::Success();
}
-NKikimr::TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr& f, const std::shared_ptr& data) {
+TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr& f, const std::shared_ptr& data) {
AFL_VERIFY(f);
AFL_VERIFY(data);
- if (data->GetRecordsCount() != RecordsCount) {
+ if (RecordsCount && data->GetRecordsCount() != *RecordsCount) {
return TConclusionStatus::Fail(TStringBuilder() << "inconsistency records count in new column: " <<
f->name() << ". expected: " << RecordsCount << ", reality: " << data->GetRecordsCount());
}
if (!data->GetDataType()->Equals(f->type())) {
return TConclusionStatus::Fail("schema and data type are not equals: " + data->GetDataType()->ToString() + " vs " + f->type()->ToString());
}
- if (Schema->GetFieldByName(f->name())) {
- return TConclusionStatus::Fail("field name duplication: " + f->name());
- }
- auto resultAdd = Schema->AddField(Schema->num_fields(), f);
- if (!resultAdd.ok()) {
- return TConclusionStatus::Fail("internal schema error on add field: " + resultAdd.status().ToString());
+ {
+ auto conclusion = Schema->AddField(f);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
}
- Schema = *resultAdd;
+ RecordsCount = data->GetRecordsCount();
Columns.emplace_back(data);
return TConclusionStatus::Success();
}
-TGeneralContainer::TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns)
- : Schema(schema)
- , Columns(std::move(columns))
-{
- AFL_VERIFY(schema);
+TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr& f, const std::shared_ptr& data) {
+ return AddField(f, std::make_shared(data));
+}
+
+TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr& f, const std::shared_ptr& data) {
+ return AddField(f, std::make_shared(data));
+}
+
+void TGeneralContainer::Initialize() {
std::optional recordsCount;
AFL_VERIFY(Schema->num_fields() == (i32)Columns.size())("schema", Schema->num_fields())("columns", Columns.size());
for (i32 i = 0; i < Schema->num_fields(); ++i) {
@@ -58,12 +68,34 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr& schem
}
}
AFL_VERIFY(recordsCount);
+ AFL_VERIFY(!RecordsCount || *RecordsCount == *recordsCount);
RecordsCount = *recordsCount;
}
+TGeneralContainer::TGeneralContainer(const std::vector>& fields, std::vector>&& columns)
+ : Schema(std::make_shared(fields))
+ , Columns(std::move(columns))
+{
+ Initialize();
+}
+
+TGeneralContainer::TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns)
+ : Schema(std::make_shared(schema))
+ , Columns(std::move(columns))
+{
+ Initialize();
+}
+
+TGeneralContainer::TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns)
+ : Schema(std::make_shared(schema))
+ , Columns(std::move(columns))
+{
+ Initialize();
+}
+
TGeneralContainer::TGeneralContainer(const std::shared_ptr& table) {
AFL_VERIFY(table);
- Schema = table->schema();
+ Schema = std::make_shared(table->schema());
RecordsCount = table->num_rows();
for (auto&& i : table->columns()) {
if (i->num_chunks() == 1) {
@@ -72,15 +104,22 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr& table)
Columns.emplace_back(std::make_shared(i));
}
}
+ Initialize();
}
TGeneralContainer::TGeneralContainer(const std::shared_ptr& table) {
AFL_VERIFY(table);
- Schema = table->schema();
+ Schema = std::make_shared(table->schema());
RecordsCount = table->num_rows();
for (auto&& i : table->columns()) {
Columns.emplace_back(std::make_shared(i));
}
+ Initialize();
+}
+
+TGeneralContainer::TGeneralContainer(const ui32 recordsCount)
+ : RecordsCount(recordsCount)
+ , Schema(std::make_shared()) {
}
std::shared_ptr TGeneralContainer::GetAccessorByNameVerified(const std::string& fieldId) const {
@@ -110,14 +149,78 @@ std::shared_ptr TGeneralContainer::BuildTableOptional(const std::o
if (fields.empty()) {
return nullptr;
}
- return arrow::Table::Make(std::make_shared(fields), columns, RecordsCount);
+ AFL_VERIFY(RecordsCount);
+ return arrow::Table::Make(std::make_shared(fields), columns, *RecordsCount);
}
-std::shared_ptr TGeneralContainer::BuildTable(const std::optional>& columnNames /*= {}*/) const {
+std::shared_ptr TGeneralContainer::BuildTableVerified(const std::optional>& columnNames /*= {}*/) const {
auto result = BuildTableOptional(columnNames);
AFL_VERIFY(result);
AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size());
return result;
}
+std::shared_ptr TGeneralContainer::GetAccessorByNameOptional(const std::string& fieldId) const {
+ int idx = Schema->GetFieldIndex(fieldId);
+ if (idx == -1) {
+ return nullptr;
+ }
+ AFL_VERIFY((ui32)idx < Columns.size())("idx", idx)("count", Columns.size());
+ return Columns[idx];
+}
+
+TConclusionStatus TGeneralContainer::SyncSchemaTo(const std::shared_ptr& schema, const IFieldsConstructor* defaultFieldsConstructor, const bool forceDefaults) {
+ std::shared_ptr schemaNew = std::make_shared();
+ std::vector> columnsNew;
+ if (!RecordsCount) {
+ return TConclusionStatus::Fail("original container has not data");
+ }
+ for (auto&& i : schema->fields()) {
+ const int idx = Schema->GetFieldIndex(i->name());
+ if (idx == -1) {
+ if (!defaultFieldsConstructor) {
+ return TConclusionStatus::Fail("haven't field for sync: '" + i->name() + "'");
+ } else {
+ schemaNew->AddField(i).Validate();
+ auto defConclusion = defaultFieldsConstructor->GetDefaultColumnElementValue(i, forceDefaults);
+ if (defConclusion.IsFail()) {
+ return defConclusion;
+ }
+ columnsNew.emplace_back(std::make_shared(NArrow::TThreadSimpleArraysCache::Get(i->type(), *defConclusion, *RecordsCount)));
+ }
+ } else {
+ const auto& fOwned = Schema->GetFieldVerified(idx);
+ if (!fOwned->type()->Equals(i->type())) {
+ return TConclusionStatus::Fail("different field types for '" + i->name() + "'. Have " + fOwned->type()->ToString() + ", need " + i->type()->ToString());
+ }
+ schemaNew->AddField(fOwned).Validate();
+ columnsNew.emplace_back(Columns[idx]);
+ }
+ }
+ std::swap(Schema, schemaNew);
+ std::swap(columnsNew, Columns);
+ return TConclusionStatus::Success();
+}
+
+TString TGeneralContainer::DebugString() const {
+ TStringBuilder result;
+ if (RecordsCount) {
+ result << "records_count=" << *RecordsCount << ";";
+ }
+ result << "schema=" << Schema->ToString() << ";";
+ return result;
+}
+
+TConclusion> IFieldsConstructor::GetDefaultColumnElementValue(const std::shared_ptr& field, const bool force) const {
+ AFL_VERIFY(field);
+ auto result = DoGetDefaultColumnElementValue(field->name());
+ if (result) {
+ return result;
+ }
+ if (force) {
+ return NArrow::DefaultScalar(field->type());
+ }
+ return TConclusionStatus::Fail("have not default value for column " + field->name());
+}
+
}
diff --git a/ydb/core/formats/arrow/common/container.h b/ydb/core/formats/arrow/common/container.h
index 25262d14ff4a..e2a05cf3f135 100644
--- a/ydb/core/formats/arrow/common/container.h
+++ b/ydb/core/formats/arrow/common/container.h
@@ -1,7 +1,10 @@
#pragma once
#include "accessor.h"
+#include
+
#include
+#include
#include
#include
@@ -12,50 +15,76 @@
namespace NKikimr::NArrow {
+class IFieldsConstructor {
+private:
+ virtual std::shared_ptr DoGetDefaultColumnElementValue(const std::string& fieldName) const = 0;
+public:
+ TConclusion> GetDefaultColumnElementValue(const std::shared_ptr& field, const bool force) const;
+};
+
class TGeneralContainer {
private:
- YDB_READONLY(ui64, RecordsCount, 0);
- YDB_READONLY_DEF(std::shared_ptr, Schema);
+ YDB_READONLY_DEF(std::optional, RecordsCount);
+ YDB_READONLY_DEF(std::shared_ptr, Schema);
std::vector> Columns;
+ void Initialize();
public:
- TString DebugString() const {
- return TStringBuilder()
- << "records_count=" << RecordsCount << ";"
- << "schema=" << Schema->ToString() << ";"
- ;
+ TGeneralContainer(const ui32 recordsCount);
+
+ ui32 GetRecordsCountVerified() const {
+ AFL_VERIFY(RecordsCount);
+ return *RecordsCount;
+ }
+
+ TString DebugString() const;
+
+ [[nodiscard]] TConclusionStatus SyncSchemaTo(const std::shared_ptr& schema,
+ const IFieldsConstructor* defaultFieldsConstructor, const bool forceDefaults);
+
+ bool HasColumn(const std::string& name) {
+ return Schema->HasField(name);
+ }
+
+ ui64 num_columns() const {
+ return Columns.size();
}
ui64 num_rows() const {
- return RecordsCount;
+ AFL_VERIFY(RecordsCount);
+ return *RecordsCount;
+ }
+
+ ui32 GetColumnsCount() const {
+ return Columns.size();
+ }
+
+ const std::shared_ptr& GetColumnVerified(const ui32 idx) const {
+ AFL_VERIFY(idx < Columns.size());
+ return Columns[idx];
}
- std::shared_ptr BuildTable(const std::optional>& columnNames = {}) const;
+ std::shared_ptr BuildTableVerified(const std::optional>& columnNames = {}) const;
std::shared_ptr BuildTableOptional(const std::optional>& columnNames = {}) const;
std::shared_ptr BuildEmptySame() const;
[[nodiscard]] TConclusionStatus MergeColumnsStrictly(const TGeneralContainer& container);
[[nodiscard]] TConclusionStatus AddField(const std::shared_ptr& f, const std::shared_ptr& data);
+ [[nodiscard]] TConclusionStatus AddField(const std::shared_ptr& f, const std::shared_ptr& data);
- TGeneralContainer(const std::shared_ptr& table);
+ [[nodiscard]] TConclusionStatus AddField(const std::shared_ptr& f, const std::shared_ptr& data);
+ TGeneralContainer(const std::shared_ptr& table);
TGeneralContainer(const std::shared_ptr& table);
-
TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns);
+ TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns);
+ TGeneralContainer(const std::vector>& fields, std::vector>&& columns);
arrow::Status ValidateFull() const {
return arrow::Status::OK();
}
- std::shared_ptr GetAccessorByNameOptional(const std::string& fieldId) const {
- for (i32 i = 0; i < Schema->num_fields(); ++i) {
- if (Schema->field(i)->name() == fieldId) {
- return Columns[i];
- }
- }
- return nullptr;
- }
-
+ std::shared_ptr GetAccessorByNameOptional(const std::string& fieldId) const;
std::shared_ptr GetAccessorByNameVerified(const std::string& fieldId) const;
};
diff --git a/ydb/core/formats/arrow/modifier/schema.cpp b/ydb/core/formats/arrow/modifier/schema.cpp
new file mode 100644
index 000000000000..4cf792614802
--- /dev/null
+++ b/ydb/core/formats/arrow/modifier/schema.cpp
@@ -0,0 +1,69 @@
+#include "schema.h"
+#include
+#include
+
+namespace NKikimr::NArrow::NModifier {
+
+std::shared_ptr TSchema::Finish() {
+ AFL_VERIFY(!Finished);
+ Finished = true;
+ return std::make_shared(Fields);
+}
+
+const std::shared_ptr& TSchema::GetFieldByName(const std::string& name) const {
+ AFL_VERIFY(!Finished);
+ auto it = IndexByName.find(name);
+ if (it == IndexByName.end()) {
+ return Default>();
+ } else {
+ return Fields[it->second];
+ }
+}
+
+TConclusionStatus TSchema::AddField(const std::shared_ptr& f) {
+ AFL_VERIFY(!Finished);
+ if (!IndexByName.emplace(f->name(), Fields.size()).second) {
+ return TConclusionStatus::Fail("field name duplication: " + f->name());
+ }
+ Fields.emplace_back(f);
+ return TConclusionStatus::Success();
+}
+
+TString TSchema::ToString() const {
+ TStringBuilder result;
+ for (auto&& i : Fields) {
+ result << i->ToString() << ";";
+ }
+ return result;
+}
+
+const std::shared_ptr& TSchema::field(const ui32 index) const {
+ AFL_VERIFY(index < Fields.size());
+ return Fields[index];
+}
+
+const std::shared_ptr& TSchema::GetFieldVerified(const ui32 index) const {
+ AFL_VERIFY(index < Fields.size());
+ return Fields[index];
+}
+
+void TSchema::Initialize(const std::vector>& fields) {
+ AFL_VERIFY(!Initialized);
+ Initialized = true;
+ for (auto&& i : fields) {
+ IndexByName.emplace(i->name(), Fields.size());
+ Fields.emplace_back(i);
+ }
+}
+
+TSchema::TSchema(const std::shared_ptr& schema) {
+ AFL_VERIFY(schema);
+ Initialize(schema->Fields);
+}
+
+TSchema::TSchema(const std::shared_ptr& schema) {
+ AFL_VERIFY(schema);
+ Initialize(schema->fields());
+}
+
+}
\ No newline at end of file
diff --git a/ydb/core/formats/arrow/modifier/schema.h b/ydb/core/formats/arrow/modifier/schema.h
new file mode 100644
index 000000000000..dc663bad9f6a
--- /dev/null
+++ b/ydb/core/formats/arrow/modifier/schema.h
@@ -0,0 +1,55 @@
+#pragma once
+#include
+#include
+#include
+
+namespace NKikimr::NArrow::NModifier {
+class TSchema {
+private:
+ bool Initialized = false;
+ THashMap IndexByName;
+ std::vector> Fields;
+ bool Finished = false;
+
+ void Initialize(const std::vector