Skip to content

Commit

Permalink
Merge 44f8ee3 into c1fc810
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt authored Sep 25, 2024
2 parents c1fc810 + 44f8ee3 commit e4126db
Show file tree
Hide file tree
Showing 13 changed files with 1,637 additions and 336 deletions.
51 changes: 49 additions & 2 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ message TEvLocalKMeansRequest {

// id of parent cluster
optional uint32 Parent = 15;
// [Child ... Child + K] ids reserved for our clusters
// [Child ... Child + K) ids reserved for this kmeans clusters
optional uint32 Child = 16;

optional string LevelName = 17;
Expand All @@ -1532,7 +1532,7 @@ message TEvLocalKMeansRequest {
repeated string DataColumns = 20;
}

message TEvLocalKMeansProgressResponse {
message TEvLocalKMeansResponse {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
Expand All @@ -1552,6 +1552,53 @@ message TEvLocalKMeansProgressResponse {
// optional uint32 DoneRounds = 11;
}

message TEvReshuffleKMeansRequest {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
optional NKikimrProto.TPathID PathId = 3;

optional uint64 SnapshotTxId = 4;
optional uint64 SnapshotStep = 5;

optional uint64 SeqNoGeneration = 6;
optional uint64 SeqNoRound = 7;

optional Ydb.Table.VectorIndexSettings Settings = 8;

optional TEvLocalKMeansRequest.EState Upload = 9;

// id of parent cluster
optional uint32 Parent = 10;
// [Child ... Child + ClustersSize) ids of this kmeans clusters
optional uint32 Child = 11;
// centroids of clusters
repeated string Clusters = 12;

optional string PostingName = 13;

optional string EmbeddingColumn = 14;
repeated string DataColumns = 15;
}

message TEvReshuffleKMeansResponse {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
optional NKikimrProto.TPathID PathId = 3;

optional uint64 RequestSeqNoGeneration = 4;
optional uint64 RequestSeqNoRound = 5;

optional NKikimrIndexBuilder.EBuildStatus Status = 6;
repeated Ydb.Issue.IssueMessage Issues = 7;

// TODO(mbkkt) implement slow-path (reliable-path)
// optional uint64 RowsDelta = 8;
// optional uint64 BytesDelta = 9;
// optional last written primary key
}

message TEvCdcStreamScanRequest {
message TLimits {
optional uint32 BatchMaxBytes = 1 [default = 512000];
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/buffer_data.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include "ydb/core/scheme/scheme_tablecell.h"
#include "ydb/core/tx/datashard/upload_stats.h"
#include "ydb/core/tx/tx_proxy/upload_rows.h"
Expand Down
25 changes: 20 additions & 5 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,10 @@ struct TEvDataShard {
EvSampleKResponse,

EvLocalKMeansRequest,
EvLocalKMeansProgressResponse,
EvLocalKMeansResponse,

EvReshuffleKMeansRequest,
EvReshuffleKMeansResponse,

EvEnd
};
Expand Down Expand Up @@ -1457,16 +1460,28 @@ struct TEvDataShard {
TEvDataShard::EvSampleKResponse> {
};

struct TEvReshuffleKMeansRequest
: public TEventPB<TEvReshuffleKMeansRequest,
NKikimrTxDataShard::TEvReshuffleKMeansRequest,
TEvDataShard::EvReshuffleKMeansRequest> {
};

struct TEvReshuffleKMeansResponse
: public TEventPB<TEvReshuffleKMeansResponse,
NKikimrTxDataShard::TEvReshuffleKMeansResponse,
TEvDataShard::EvReshuffleKMeansResponse> {
};

struct TEvLocalKMeansRequest
: public TEventPB<TEvLocalKMeansRequest,
NKikimrTxDataShard::TEvLocalKMeansRequest,
TEvDataShard::EvLocalKMeansRequest> {
};

struct TEvLocalKMeansProgressResponse
: public TEventPB<TEvLocalKMeansProgressResponse,
NKikimrTxDataShard::TEvLocalKMeansProgressResponse,
TEvDataShard::EvLocalKMeansProgressResponse> {
struct TEvLocalKMeansResponse
: public TEventPB<TEvLocalKMeansResponse,
NKikimrTxDataShard::TEvLocalKMeansResponse,
TEvDataShard::EvLocalKMeansResponse> {
};

struct TEvKqpScan
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class TDataShard
class TTxHandleSafeBuildIndexScan;
class TTxHandleSafeSampleKScan;
class TTxHandleSafeLocalKMeansScan;
class TTxHandleSafeReshuffleKMeansScan;
class TTxHandleSafeStatisticsScan;

class TTxMediatorStateRestored;
Expand Down Expand Up @@ -1325,6 +1326,8 @@ class TDataShard
void HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx);
void HandleSafe(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvReshuffleKMeansRequest::TPtr& ev, const TActorContext& ctx);
void HandleSafe(TEvDataShard::TEvReshuffleKMeansRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx);
void HandleSafe(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -3117,6 +3120,7 @@ class TDataShard
HFunc(TEvDataShard::TEvDiscardVolatileSnapshotRequest, Handle);
HFuncTraced(TEvDataShard::TEvBuildIndexCreateRequest, Handle);
HFunc(TEvDataShard::TEvSampleKRequest, Handle);
HFunc(TEvDataShard::TEvReshuffleKMeansRequest, Handle);
HFunc(TEvDataShard::TEvLocalKMeansRequest, Handle);
HFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle);
HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_ut_local_kmeans.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardLocalKMeansScan) {
runtime.SendToPipe(tid, sender, ev.release(), 0, GetPipeConfigWithRetries());

TAutoPtr<IEventHandle> handle;
auto reply = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvLocalKMeansProgressResponse>(handle);
auto reply = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvLocalKMeansResponse>(handle);
UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST);
}
}
Expand Down Expand Up @@ -148,7 +148,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardLocalKMeansScan) {
runtime.SendToPipe(tid, sender, ev2.release(), 0, GetPipeConfigWithRetries());

TAutoPtr<IEventHandle> handle;
auto reply = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvLocalKMeansProgressResponse>(handle);
auto reply = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvLocalKMeansResponse>(handle);

NYql::TIssues issues;
NYql::IssuesFromMessage(reply->Record.GetIssues(), issues);
Expand Down
Loading

0 comments on commit e4126db

Please sign in to comment.