Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
case NKikimrReplication::TReplicationState::kDone:
to.mutable_done();
break;
case NKikimrReplication::TReplicationState::kPaused:
to.mutable_paused();
break;
default:
break;
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2674,6 +2674,12 @@ class TKqpGatewayProxy : public IKikimrGateway {
auto& state = *op.MutableState();
state.MutableDone()->SetFailoverMode(
static_cast<NKikimrReplication::TReplicationState::TDone::EFailoverMode>(done->FailoverMode));
} else if (const auto& paused = settings.Settings.StatePaused) {
auto& state = *op.MutableState();
state.MutablePaused();
} else if (const auto& standBy = settings.Settings.StateStandBy) {
auto& state = *op.MutableState();
state.MutableStandBy();
}

if (settings.Settings.ConnectionString || settings.Settings.Endpoint || settings.Settings.Database ||
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ namespace {
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
if (to_lower(value) == "done") {
dstSettings.EnsureStateDone();
} else if (to_lower(value) == "paused") {
dstSettings.StatePaused = true;
} else if (to_lower(value) == "standby") {
dstSettings.StateStandBy = true;
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "Unknown " << objectName << " state: " << value));
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,8 @@ struct TReplicationSettingsBase {
TMaybe<TOAuthToken> OAuthToken;
TMaybe<TStaticCredentials> StaticCredentials;
TMaybe<TStateDone> StateDone;
bool StatePaused = false;
bool StateStandBy = false;

using EFailoverMode = TStateDone::EFailoverMode;
TStateDone& EnsureStateDone(EFailoverMode mode = EFailoverMode::Consistent) {
Expand Down
37 changes: 24 additions & 13 deletions ydb/core/tx/replication/controller/dst_alterer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,28 +139,38 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
ui64 rid,
ui64 tid,
TReplication::ETargetKind kind,
const TPathId& dstPathId)
const TPathId& dstPathId,
const TReplication::EState desiredState)
: Parent(parent)
, SchemeShardId(schemeShardId)
, ReplicationId(rid)
, TargetId(tid)
, Kind(kind)
, DstPathId(dstPathId)
, DesiredState(desiredState)
, LogPrefix("DstAlterer", ReplicationId, TargetId)
{
}

void Bootstrap() {
if (!DstPathId) {
Success();
} else {
switch (Kind) {
case TReplication::ETargetKind::Table:
case TReplication::ETargetKind::IndexTable:
return AllocateTxId();
case TReplication::ETargetKind::Transfer:
switch (DesiredState) {
case TReplication::EState::Done:
if (!DstPathId) {
return Success();
}
} else {
switch (Kind) {
case TReplication::ETargetKind::Table:
case TReplication::ETargetKind::IndexTable:
return AllocateTxId();
case TReplication::ETargetKind::Transfer:
return Success();
}
}
case TReplication::EState::Paused:
case TReplication::EState::Ready:
case TReplication::EState::Error:
case TReplication::EState::Removing:
return Success();
}
}

Expand All @@ -179,6 +189,7 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
const ui64 TargetId;
const TReplication::ETargetKind Kind;
const TPathId DstPathId;
const TReplication::EState DesiredState;
const TActorLogPrefix LogPrefix;

ui64 TxId = 0;
Expand All @@ -191,13 +202,13 @@ IActor* CreateDstAlterer(TReplication* replication, ui64 targetId, const TActorC
const auto* target = replication->FindTarget(targetId);
Y_ABORT_UNLESS(target);
return CreateDstAlterer(ctx.SelfID, replication->GetSchemeShardId(),
replication->GetId(), target->GetId(), target->GetKind(), target->GetDstPathId());
replication->GetId(), target->GetId(), target->GetKind(), target->GetDstPathId(), replication->GetDesiredState());
}

IActor* CreateDstAlterer(const TActorId& parent, ui64 schemeShardId,
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TPathId& dstPathId)
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TPathId& dstPathId, TReplication::EState desiredState)
{
return new TDstAlterer(parent, schemeShardId, rid, tid, kind, dstPathId);
return new TDstAlterer(parent, schemeShardId, rid, tid, kind, dstPathId, desiredState);
}

}
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/controller/dst_alterer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ namespace NKikimr::NReplication::NController {

IActor* CreateDstAlterer(TReplication* replication, ui64 targetId, const TActorContext& ctx);
IActor* CreateDstAlterer(const TActorId& parent, ui64 schemeShardId,
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TPathId& dstPathId);
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TPathId& dstPathId, TReplication::EState desiredState);

}
3 changes: 2 additions & 1 deletion ydb/core/tx/replication/controller/replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class TReplication::TImpl: public TLagProvider {

switch (State) {
case EState::Ready:
case EState::Paused:
if (!Targets) {
return DiscoverTargets(ctx);
} else {
Expand Down Expand Up @@ -387,7 +388,7 @@ void TReplication::RemovePendingAlterTarget(ui64 id) {
}

bool TReplication::CheckAlterDone() const {
return Impl->State == EState::Ready && Impl->PendingAlterTargets.empty();
return (Impl->State == EState::Ready || Impl->State == EState::Paused) && Impl->PendingAlterTargets.empty();
}

void TReplication::UpdateLag(ui64 targetId, TDuration lag) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/controller/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
Ready,
Done,
Removing,
Paused,
Error = 255
};

Expand All @@ -42,6 +43,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
Alter,
Done,
Removing,
Paused,
Error = 255
};

Expand Down
31 changes: 26 additions & 5 deletions ydb/core/tx/replication/controller/target_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,28 @@ void TTargetBase::SetDstState(const EDstState value) {
DstState = value;
switch (DstState) {
case EDstState::Alter:
return Replication->AddPendingAlterTarget(Id);
case EDstState::Done:
return Replication->RemovePendingAlterTarget(Id);
case EDstState::Ready:
PendingRemoveWorkers = false;
Replication->AddPendingAlterTarget(Id);
break;
default:
Replication->RemovePendingAlterTarget(Id);
break;
}

if (DstState != EDstState::Creating) {
Reset(DstCreator);
}
if (DstState != EDstState::Ready) {
Reset(WorkerRegistar);
}
if (DstState != EDstState::Alter) {
Reset(DstAlterer);
}
if (DstState != EDstState::Removing) {
Reset(DstRemover);
}
if (DstState != EDstState::Alter && DstState != EDstState::Removing) {
PendingRemoveWorkers = false;
}
}

const TPathId& TTargetBase::GetDstPathId() const {
Expand Down Expand Up @@ -187,6 +200,8 @@ void TTargetBase::Progress(const TActorContext& ctx) {
DstRemover = ctx.Register(CreateDstRemover(Replication, Id, ctx));
}
break;
case EDstState::Paused:
break;
case EDstState::Error:
break;
}
Expand All @@ -207,6 +222,12 @@ void TTargetBase::Shutdown(const TActorContext& ctx) {
}
}

void TTargetBase::Reset(TActorId& id) {
if (auto actorId = std::exchange(id, {})) {
TlsActivationContext->AsActorContext().Send(actorId, new TEvents::TEvPoison());
}
}

void TTargetBase::UpdateConfig(const NKikimrReplication::TReplicationConfig&) {
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/target_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TTargetBase

void Progress(const TActorContext& ctx) override;
void Shutdown(const TActorContext& ctx) override;
void Reset(TActorId& actorId);

void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class TController::TTxAlterDstResult: public TTxBase {

if (Replication->CheckAlterDone()) {
CLOG_N(ctx, "Replication altered"
<< ": rid# " << rid);
<< ": rid# " << rid
<< ", state# " << Replication->GetDesiredState());
Replication->SetState(Replication->GetDesiredState());
}
} else {
Expand Down Expand Up @@ -102,6 +103,8 @@ class TController::TTxAlterDstResult: public TTxBase {
return TReplication::EDstState::Error;
case TReplication::EState::Removing:
return TReplication::EDstState::Removing;
case TReplication::EState::Paused:
return TReplication::EDstState::Paused;
}
}

Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/replication/controller/tx_alter_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ class TController::TTxAlterReplication: public TTxBase {
desiredState = TReplication::EState::Done;
alter = true;
break;
case NKikimrReplication::TReplicationState::kPaused:
desiredState = TReplication::EState::Paused;
alter = true;
break;
case NKikimrReplication::TReplicationState::kStandBy:
desiredState = TReplication::EState::Ready;
alter = true;
break;
default:
Y_ABORT("Invalid state");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ class TController::TTxDescribeReplication: public TTxBase {
case TReplication::EState::Done:
state.MutableDone();
break;
case TReplication::EState::Paused:
state.MutablePaused();
break;
case TReplication::EState::Error:
if (auto issue = state.MutableError()->AddIssues()) {
issue->set_severity(static_cast<uint32_t>(NYdb::NIssue::ESeverity::Error));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class TAlterReplication: public TSubOperation {
using TState = NKikimrReplication::TReplicationState;
switch (desc.GetState().GetStateCase()) {
case TState::kStandBy:
if (newState.GetStateCase() != TState::kDone) {
if (!THashSet<TState::StateCase>{TState::kPaused, TState::kDone}.contains(newState.GetStateCase())) {
result.SetError(NKikimrScheme::StatusInvalidParameter, "Cannot switch state");
return false;
}
Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,15 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
Paused {
}
}
)", {NKikimrScheme::StatusInvalidParameter});
)");

TestAlterReplication(runtime, ++txId, "/MyRoot", R"(
Name: "Replication"
State {
StandBy {
}
}
)");

TestAlterReplication(runtime, ++txId, "/MyRoot", R"(
Name: "Replication"
Expand All @@ -275,14 +283,6 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
NLs::PathExist,
NLs::ReplicationState(NKikimrReplication::TReplicationState::kDone),
});

TestAlterReplication(runtime, ++txId, "/MyRoot", R"(
Name: "Replication"
State {
StandBy {
}
}
)", {NKikimrScheme::StatusInvalidParameter});
}

Y_UNIT_TEST(Describe) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/schemeshard/ut_transfer/ut_transfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,15 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
Paused {
}
}
)", {NKikimrScheme::StatusInvalidParameter});
)");

TestAlterTransfer(runtime, ++txId, "/MyRoot", R"(
Name: "Transfer"
State {
StandBy {
}
}
)", {NKikimrScheme::StatusInvalidParameter});
)");
}

} // TTransferTests
4 changes: 4 additions & 0 deletions ydb/public/api/protos/draft/ydb_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ message DescribeReplicationResult {
message DoneState {
}

message PausedState {
}

// Description of scheme object.
Ydb.Scheme.Entry self = 1;

Expand All @@ -90,6 +93,7 @@ message DescribeReplicationResult {
RunningState running = 4;
ErrorState error = 5;
DoneState done = 6;
PausedState paused = 9;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class TRunningState {

struct TDoneState {};

struct TPausedState {};

class TErrorState {
class TImpl;

Expand Down Expand Up @@ -131,6 +133,7 @@ class TReplicationDescription {
Running,
Error,
Done,
Paused,
};

explicit TReplicationDescription(const Ydb::Replication::DescribeReplicationResult& desc);
Expand All @@ -145,6 +148,7 @@ class TReplicationDescription {
const TRunningState& GetRunningState() const;
const TErrorState& GetErrorState() const;
const TDoneState& GetDoneState() const;
const TPausedState& GetPausedState() const;

private:
TConnectionParams ConnectionParams_;
Expand All @@ -158,7 +162,8 @@ class TReplicationDescription {
std::variant<
TRunningState,
TErrorState,
TDoneState
TDoneState,
TPausedState
> State_;
};

Expand Down
8 changes: 8 additions & 0 deletions ydb/public/sdk/cpp/src/client/draft/ydb_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ
State_ = TDoneState();
break;

case Ydb::Replication::DescribeReplicationResult::kPaused:
State_ = TPausedState();
break;

default:
break;
}
Expand Down Expand Up @@ -200,6 +204,10 @@ const TDoneState& TReplicationDescription::GetDoneState() const {
return std::get<TDoneState>(State_);
}

const TPausedState& TReplicationDescription::GetPausedState() const {
return std::get<TPausedState>(State_);
}

TDescribeReplicationResult::TDescribeReplicationResult(TStatus&& status, Ydb::Replication::DescribeReplicationResult&& desc)
: NScheme::TDescribePathResult(std::move(status), desc.self())
, ReplicationDescription_(desc)
Expand Down
Loading
Loading