Skip to content

Commit

Permalink
more work
Browse files Browse the repository at this point in the history
  • Loading branch information
Sazonov99 committed Mar 10, 2025
1 parent 744d6f3 commit 1f4adf1
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ class TCheckRangeActor: public NActors::TActorBootstrapped<TCheckRangeActor>

void Bootstrap(const NActors::TActorContext& ctx);

protected:
private:
void ReplyAndDie(const NActors::TActorContext& ctx, const NProto::TError& error);

void ReplyAndDie(
const NActors::TActorContext& ctx,
std::unique_ptr<TEvService::TEvCheckRangeResponse>);

private:
virtual void HandleReadBlocksResponse(
void HandleReadBlocksResponse(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const NActors::TActorContext& ctx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,17 @@ namespace {

class TMirrorCheckRangeActor final: public TCheckRangeActor
{
private:
ui32 ReplicaCount;
bool IsRecievedError;
NProto::TError Status;

public:
TMirrorCheckRangeActor(
const NActors::TActorId& partition,
NProto::TCheckRangeRequest&& request,
const ui32 replicaCount,
TRequestInfoPtr&& requestInfo);
using TCheckRangeActor::TCheckRangeActor;

void Bootstrap(const TActorContext& ctx);

void HandleReadBlocksResponse(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const NActors::TActorContext& ctx) override;

private:
void SendReadBlocksRequest(const TActorContext& ctx) override;
};

////////////////////////////////////////////////////////////////////////////////

TMirrorCheckRangeActor::TMirrorCheckRangeActor(
const NActors::TActorId& partition,
NProto::TCheckRangeRequest&& request,
const ui32 replicaCount,
TRequestInfoPtr&& requestInfo)
: TCheckRangeActor(partition, std::move(request), std::move(requestInfo))
, ReplicaCount(replicaCount)
{}

void TMirrorCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx)
{
Expand All @@ -66,52 +45,13 @@ void TMirrorCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx)
request->Record.SetBlocksCount(Request.GetBlocksCount());

auto* headers = request->Record.MutableHeaders();
headers->SetReplicaCount(ReplicaCount);
headers->SetReplicaCount(Request.headers().GetReplicaCount());
headers->SetClientId(clientId);
headers->SetIsBackgroundRequest(true);

NCloud::Send(ctx, Partition, std::move(request));
}

void TMirrorCheckRangeActor::HandleReadBlocksResponse(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();
auto response =
std::make_unique<TEvService::TEvCheckRangeResponse>(MakeError(S_OK));

const auto& error = msg->Record.GetError();

if (!HasError(error)) {
// calculateCheckSums
} else {
LOG_ERROR_S(
ctx,
TBlockStoreComponents::PARTITION,
"reading error has occurred: " << FormatError(error));

if (!HasError(Status)) {
Status.CopyFrom(error);

if (error.GetCode() == E_REJECTED) {
--ReplicaCount;
SendReadBlocksRequest(ctx);
return;
}
} else {
Status.MutableMessage()
->append(" \n Error while reading from ")
.append(ToString(ReplicaCount))
.append(" replicas:\n")
.append(error.GetMessage());
}
}

response->Record.MutableStatus()->CopyFrom(Status);
ReplyAndDie(ctx, std::move(response));
}

} // namespace

//////////////////////////////////////////////////////////////////
Expand All @@ -134,13 +74,11 @@ void TMirrorPartitionActor::HandleCheckRange(
NCloud::Reply(ctx, *ev, std::move(response));
return;
}
auto replicaCount = State.GetReplicaInfos().size();

NCloud::Register<TMirrorCheckRangeActor>(
ctx,
SelfId(),
std::move(record),
replicaCount,
CreateRequestInfo(ev->Sender, ev->Cookie, ev->Get()->CallContext));
}

Expand Down
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,13 @@ class TPartitionClient
}

std::unique_ptr<TEvService::TEvCheckRangeRequest>
CreateCheckRangeRequest(TString id, ui32 startIndex, ui32 size)
CreateCheckRangeRequest(TString id, ui32 startIndex, ui32 size, ui32 replicaCount = 3)
{
auto request = std::make_unique<TEvService::TEvCheckRangeRequest>();
request->Record.SetDiskId(id);
request->Record.SetStartIndex(startIndex);
request->Record.SetBlocksCount(size);
request->Record.mutable_headers()->SetReplicaCount(replicaCount);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ void TCheckRangeActor::Bootstrap(const TActorContext& ctx)
request->Record.SetDiskId(Request.GetDiskId());
request->Record.SetStartIndex(Request.GetStartIndex());
request->Record.SetBlocksCount(Request.GetBlocksCount());
request->Record.mutable_headers()->SetReplicaCount(Request.GetReplicaCount());

LOG_INFO(
ctx,
Expand Down
14 changes: 11 additions & 3 deletions cloud/blockstore/libs/storage/service/service_actor_checkrange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ class TCheckRangeActor final: public TActorBootstrapped<TCheckRangeActor>
const TString DiskId;
const ui64 StartIndex;
const ui64 BlocksCount;
const ui32 ReplicaCount;

public:
TCheckRangeActor(
TRequestInfoPtr requestInfo,
TStorageConfigPtr config,
TString diskId,
ui64 startIndex,
ui64 blocksCount);
ui64 blocksCount,
ui32 replicaCount);

void Bootstrap(const TActorContext& ctx);

Expand All @@ -61,12 +63,14 @@ TCheckRangeActor::TCheckRangeActor(
TStorageConfigPtr config,
TString diskId,
ui64 startIndex,
ui64 blocksCount)
ui64 blocksCount,
ui32 replicaCount)
: RequestInfo(std::move(requestInfo))
, Config(std::move(config))
, DiskId(std::move(diskId))
, StartIndex(startIndex)
, BlocksCount(blocksCount)
, ReplicaCount(replicaCount)
{}

void TCheckRangeActor::Bootstrap(const TActorContext& ctx)
Expand All @@ -82,6 +86,9 @@ void TCheckRangeActor::CheckRange(const TActorContext& ctx)
request->Record.SetDiskId(DiskId);
request->Record.SetStartIndex(StartIndex);
request->Record.SetBlocksCount(BlocksCount);
auto* headers = request->Record.MutableHeaders();

headers->SetReplicaCount(ReplicaCount);

NCloud::Send(
ctx,
Expand Down Expand Up @@ -177,7 +184,8 @@ void TServiceActor::HandleCheckRange(
Config,
request.GetDiskId(),
request.GetStartIndex(),
request.GetBlocksCount());
request.GetBlocksCount(),
request.headers().GetReplicaCount());
}

} // namespace NCloud::NBlockStore::NStorage
5 changes: 4 additions & 1 deletion cloud/blockstore/libs/storage/testlib/service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,12 +521,15 @@ TServiceClient::CreateAddTagsRequest(
std::unique_ptr<TEvService::TEvCheckRangeRequest> TServiceClient::CreateCheckRangeRequest(
const TString& diskId,
const ui64 startIndex,
const ui64 blocksCount)
const ui64 blocksCount,
const ui32 replicaCount)
{
auto request = std::make_unique<TEvService::TEvCheckRangeRequest>();
request->Record.SetDiskId(diskId);
request->Record.SetStartIndex(startIndex);
request->Record.SetBlocksCount(blocksCount);

request->Record.MutableHeaders()->SetReplicaCount(replicaCount);
return request;
}

Expand Down
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/storage/testlib/service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ class TServiceClient
std::unique_ptr<TEvService::TEvCheckRangeRequest> CreateCheckRangeRequest(
const TString& diskId,
const ui64 blockIdx,
const ui64 blockCount);
const ui64 blockCount,
const ui32 replicaCount = 3);

std::unique_ptr<TEvService::TEvCreateVolumeLinkRequest>
CreateCreateVolumeLinkRequest(
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/private/api/protos/volume.proto
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ message TCheckRangeRequest

// Number of blobs per batch.
uint32 BlocksCount = 3;

// Number of replicas to read from m3 disks.
uint64 ReplicaCount = 4;
}

message TCheckRangeResponse
Expand Down

0 comments on commit 1f4adf1

Please sign in to comment.