Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/controller/event_util.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "event_util.h"
#include "target_table.h"
#include "target_transfer.h"

namespace NKikimr::NReplication::NController {

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "secret_resolver.h"
#include "target_discoverer.h"
#include "target_table.h"
#include "target_transfer.h"
#include "tenant_resolver.h"
#include "util.h"

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/target_discoverer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "private_events.h"
#include "target_discoverer.h"
#include "target_table.h"
#include "target_transfer.h"
#include "util.h"

#include <ydb/core/base/path.h>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "private_events.h"
#include "target_discoverer.h"
#include "target_table.h"
#include "target_transfer.h"

#include <ydb/core/tx/replication/ut_helpers/test_env.h>
#include <ydb/core/tx/replication/ut_helpers/test_table.h>
Expand Down
165 changes: 0 additions & 165 deletions ydb/core/tx/replication/controller/target_table.cpp
Original file line number Diff line number Diff line change
@@ -1,123 +1,16 @@
#include "event_util.h"
#include "logging.h"
#include "stream_consumer_remover.h"
#include "target_table.h"
#include "util.h"

#include <ydb/core/base/path.h>
#include <ydb/core/scheme/scheme_pathid.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>

namespace NKikimr::NReplication::NController {

class TTableWorkerRegistar: public TActorBootstrapped<TTableWorkerRegistar> {
void Handle(TEvYdbProxy::TEvDescribeTopicResponse::TPtr& ev) {
LOG_T("Handle " << ev->Get()->ToString());

const auto& result = ev->Get()->Result;
if (!result.IsSuccess()) {
if (IsRetryableError(result)) {
LOG_W("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Retry.");
return Retry();
}

LOG_E("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Stop.");
return; // TODO: hard error
}

for (const auto& partition : result.GetTopicDescription().GetPartitions()) {
if (!partition.GetParentPartitionIds().empty()) {
continue;
}

auto ev = MakeRunWorkerEv(
ReplicationId, TargetId, Config, partition.GetPartitionId(),
ConnectionParams, ConsistencySettings, SrcStreamPath, SrcStreamConsumerName, DstPathId);
Send(Parent, std::move(ev));
}

PassAway();
}

void Retry() {
LOG_D("Retry");
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR;
}

explicit TTableWorkerRegistar(
const TActorId& parent,
const TActorId& proxy,
const NKikimrReplication::TConnectionParams& connectionParams,
const NKikimrReplication::TConsistencySettings& consistencySettings,
ui64 rid,
ui64 tid,
const TString& srcStreamPath,
const TString& srcStreamConsumerName,
const TPathId& dstPathId,
const TReplication::ITarget::IConfig::TPtr& config)
: Parent(parent)
, YdbProxy(proxy)
, ConnectionParams(connectionParams)
, ConsistencySettings(consistencySettings)
, ReplicationId(rid)
, TargetId(tid)
, SrcStreamPath(srcStreamPath)
, SrcStreamConsumerName(srcStreamConsumerName)
, DstPathId(dstPathId)
, LogPrefix("TableWorkerRegistar", ReplicationId, TargetId)
, Config(config)
{
}

void Bootstrap() {
Become(&TThis::StateWork);
Send(YdbProxy, new TEvYdbProxy::TEvDescribeTopicRequest(SrcStreamPath, {}));
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvYdbProxy::TEvDescribeTopicResponse, Handle);
sFunc(TEvents::TEvWakeup, Bootstrap);
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
const TActorId Parent;
const TActorId YdbProxy;
const NKikimrReplication::TConnectionParams ConnectionParams;
const NKikimrReplication::TConsistencySettings ConsistencySettings;
const ui64 ReplicationId;
const ui64 TargetId;
const TString SrcStreamPath;
const TString SrcStreamConsumerName;
const TPathId DstPathId;
const TActorLogPrefix LogPrefix;
const TReplication::ITarget::IConfig::TPtr Config;

}; // TTableWorkerRegistar

TTargetTableBase::TTargetTableBase(TReplication* replication, ETargetKind finalKind,
ui64 id, const IConfig::TPtr& config)
: TTargetWithStream(replication, finalKind, id, config)
{
}

IActor* TTargetTableBase::CreateWorkerRegistar(const TActorContext& ctx) const {
auto replication = GetReplication();
const auto& config = replication->GetConfig();
return new TTableWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(),
config.GetSrcConnectionParams(), config.GetConsistencySettings(),
replication->GetId(), GetId(), BuildStreamPath(), GetStreamConsumerName(), GetDstPathId(), GetConfig());
}

TTargetTable::TTargetTable(TReplication* replication, ui64 id, const IConfig::TPtr& config)
: TTargetTableBase(replication, ETargetKind::Table, id, config)
{
Expand All @@ -140,62 +33,4 @@ TString TTargetIndexTable::BuildStreamPath() const {
return CanonizePath(ChildPath(SplitPath(GetSrcPath()), {"indexImplTable", GetStreamName()}));
}

TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config)
: TTargetTableBase(replication, ETargetKind::Transfer, id, config)
{
}

void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) {
auto& t = cfg.GetTransferSpecific().GetTargets(0);
Config = std::make_shared<TTargetTransfer::TTransferConfig>(
GetConfig()->GetSrcPath(),
GetConfig()->GetDstPath(),
t.GetTransformLambda());
}

void TTargetTransfer::Progress(const TActorContext& ctx) {
auto replication = GetReplication();

switch (GetStreamState()) {
case EStreamState::Removing:
if (GetWorkers()) {
RemoveWorkers(ctx);
} else if (!StreamConsumerRemover) {
StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx));
}
return;
case EStreamState::Creating:
case EStreamState::Ready:
case EStreamState::Removed:
case EStreamState::Error:
break;
}

TTargetWithStream::Progress(ctx);
}

void TTargetTransfer::Shutdown(const TActorContext& ctx) {
for (auto* x : TVector<TActorId*>{&StreamConsumerRemover}) {
if (auto actorId = std::exchange(*x, {})) {
ctx.Send(actorId, new TEvents::TEvPoison());
}
}

TTargetWithStream::Shutdown(ctx);
}

TString TTargetTransfer::BuildStreamPath() const {
return CanonizePath(GetSrcPath());
}

TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda)
: TConfigBase(ETargetKind::Transfer, srcPath, dstPath)
, TransformLambda(transformLambda)
{
}

const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const {
return TransformLambda;
}

}
29 changes: 0 additions & 29 deletions ydb/core/tx/replication/controller/target_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class TTargetTableBase: public TTargetWithStream {
TString GetStreamPath() const override;

protected:
IActor* CreateWorkerRegistar(const TActorContext& ctx) const override;
virtual TString BuildStreamPath() const = 0;
};

Expand Down Expand Up @@ -50,32 +49,4 @@ class TTargetIndexTable: public TTargetTableBase {
TString BuildStreamPath() const override;
};

class TTargetTransfer: public TTargetTableBase {
public:
struct TTransferConfig : public TConfigBase {
using TPtr = std::shared_ptr<TTransferConfig>;

TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda);

const TString& GetTransformLambda() const;

private:
TString TransformLambda;
};

explicit TTargetTransfer(TReplication* replication,
ui64 id, const IConfig::TPtr& config);

void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;

void Progress(const TActorContext& ctx) override;
void Shutdown(const TActorContext& ctx) override;

protected:
TString BuildStreamPath() const override;

private:
TActorId StreamConsumerRemover;
};

}
68 changes: 68 additions & 0 deletions ydb/core/tx/replication/controller/target_transfer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#include "stream_consumer_remover.h"
#include "target_transfer.h"

#include <ydb/core/base/path.h>
#include <ydb/core/tx/replication/service/service.h>
#include <ydb/library/actors/core/events.h>

namespace NKikimr::NReplication::NController {

TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config)
: TTargetWithStream(replication, ETargetKind::Transfer, id, config)
{
}

void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) {
auto& t = cfg.GetTransferSpecific().GetTargets(0);
Config = std::make_shared<TTargetTransfer::TTransferConfig>(
GetConfig()->GetSrcPath(),
GetConfig()->GetDstPath(),
t.GetTransformLambda());
}

void TTargetTransfer::Progress(const TActorContext& ctx) {
auto replication = GetReplication();

switch (GetStreamState()) {
case EStreamState::Removing:
if (GetWorkers()) {
RemoveWorkers(ctx);
} else if (!StreamConsumerRemover) {
StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx));
}
return;
case EStreamState::Creating:
case EStreamState::Ready:
case EStreamState::Removed:
case EStreamState::Error:
break;
}

TTargetWithStream::Progress(ctx);
}

void TTargetTransfer::Shutdown(const TActorContext& ctx) {
for (auto* x : TVector<TActorId*>{&StreamConsumerRemover}) {
if (auto actorId = std::exchange(*x, {})) {
ctx.Send(actorId, new TEvents::TEvPoison());
}
}

TTargetWithStream::Shutdown(ctx);
}

TString TTargetTransfer::GetStreamPath() const {
return CanonizePath(GetSrcPath());
}

TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda)
: TConfigBase(ETargetKind::Transfer, srcPath, dstPath)
, TransformLambda(transformLambda)
{
}

const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const {
return TransformLambda;
}

}
34 changes: 34 additions & 0 deletions ydb/core/tx/replication/controller/target_transfer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "target_with_stream.h"

namespace NKikimr::NReplication::NController {

class TTargetTransfer: public TTargetWithStream {
public:
struct TTransferConfig : public TConfigBase {
using TPtr = std::shared_ptr<TTransferConfig>;

TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda);

const TString& GetTransformLambda() const;

private:
TString TransformLambda;
};

explicit TTargetTransfer(TReplication* replication,
ui64 id, const IConfig::TPtr& config);

void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;

void Progress(const TActorContext& ctx) override;
void Shutdown(const TActorContext& ctx) override;

TString GetStreamPath() const override;

private:
TActorId StreamConsumerRemover;
};

}
Loading
Loading