Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ void OutgoingMigration::Finish(const GenericError& error) {
switch (state_) {
case MigrationState::C_FATAL:
case MigrationState::C_FINISHED:
CloseSocket();
return; // Already finished, nothing else to do

case MigrationState::C_CONNECTING:
Expand All @@ -192,6 +193,9 @@ void OutgoingMigration::Finish(const GenericError& error) {
});
exec_st_.JoinErrorHandler();
}

// Close socket for clean disconnect.
CloseSocket();
}

MigrationState OutgoingMigration::GetState() const {
Expand Down
20 changes: 11 additions & 9 deletions src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,21 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) {
#ifdef DFLY_USE_SSL
MaybeInitSslCtx();
#endif
// We initialize the proactor thread here such that it never races with Sock().
// ProtocolClient is never migrated to a different thread, so this is safe.
socket_thread_ = ProactorBase::me();
}

ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) {
#ifdef DFLY_USE_SSL
MaybeInitSslCtx();
#endif
socket_thread_ = ProactorBase::me();
}

ProtocolClient::~ProtocolClient() {
exec_st_.JoinErrorHandler();

// FIXME: We should close the socket explictly outside of the destructor. This currently
// breaks test_cancel_replication_immediately.
if (sock_) {
std::error_code ec;
sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); });
LOG_IF(ERROR, ec) << "Error closing socket " << ec;
}
#ifdef DFLY_USE_SSL
if (ssl_ctx_) {
SSL_CTX_free(ssl_ctx_);
Expand Down Expand Up @@ -162,6 +160,7 @@ error_code ProtocolClient::ResolveHostDns() {
error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms,
ExecutionState* cntx) {
ProactorBase* mythread = ProactorBase::me();
DCHECK(mythread == socket_thread_);
CHECK(mythread);
{
unique_lock lk(sock_mu_);
Expand Down Expand Up @@ -235,6 +234,9 @@ void ProtocolClient::CloseSocket() {
auto ec = sock_->Shutdown(SHUT_RDWR);
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
}
auto ec = sock_->Close(); // Quietly close.

LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
});
}
}
Expand Down Expand Up @@ -385,11 +387,11 @@ void ProtocolClient::ResetParser(RedisParser::Mode mode) {
}

uint64_t ProtocolClient::LastIoTime() const {
return last_io_time_;
return last_io_time_.load(std::memory_order_relaxed);
}

void ProtocolClient::TouchIoTime() {
last_io_time_ = Proactor()->GetMonotonicTimeNs();
last_io_time_.store(Proactor()->GetMonotonicTimeNs(), std::memory_order_relaxed);
}

} // namespace dfly
12 changes: 10 additions & 2 deletions src/server/protocol_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class ProtocolClient {
uint64_t LastIoTime() const;
void TouchIoTime();

// Used to set the socket_thread_ prior the initialization of sock_.
// That way Proactor() returns the right thread even when the sock_ is
// not yet initialized
void SetSocketThread(util::fb2::ProactorBase* sock_thread) {
socket_thread_ = sock_thread;
}

protected:
struct ServerContext {
std::string host;
Expand Down Expand Up @@ -107,7 +114,7 @@ class ProtocolClient {
}

auto* Proactor() const {
return sock_->proactor();
return socket_thread_;
}

util::FiberSocketBase* Sock() const {
Expand All @@ -132,7 +139,7 @@ class ProtocolClient {
std::string last_cmd_;
std::string last_resp_;

uint64_t last_io_time_ = 0; // in ns, monotonic clock.
std::atomic<uint64_t> last_io_time_ = 0; // in ns, monotonic clock.

#ifdef DFLY_USE_SSL

Expand All @@ -142,6 +149,7 @@ class ProtocolClient {
#else
void* ssl_ctx_{nullptr};
#endif
util::fb2::ProactorBase* socket_thread_;
};

} // namespace dfly
Expand Down
82 changes: 48 additions & 34 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,17 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
sync_fb_.JoinIfNeeded();
DVLOG(1) << "MainReplicationFb stopped " << this;
acks_fb_.JoinIfNeeded();
for (auto& flow : shard_flows_) {
flow.reset();
}

proactor_->Await([this]() {
// Destructor is blocking, so other fibers can observe partial state
// of flows during clean up. To avoid this, we move them and clear the
// member before the preemption point
auto shard_flows = std::move(shard_flows_);
shard_flows_.clear();
for (auto& flow : shard_flows) {
flow.reset();
}
});

if (last_journal_LSNs_.has_value()) {
return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()};
Expand Down Expand Up @@ -501,29 +509,45 @@ error_code Replica::InitiatePSync() {
return error_code{};
}

// Initialize and start sub-replica for each flow.
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
auto start_time = absl::Now();

// Initialize MultiShardExecution.
multi_shard_exe_.reset(new MultiShardExecution());
void Replica::InitializeShardFlows() {
decltype(shard_flows_) shard_flows_copy;
shard_flows_copy.resize(master_context_.num_flows);
DCHECK(!shard_flows_copy.empty());
thread_flow_map_ = Partition(shard_flows_copy.size());
const size_t pool_sz = shard_set->pool()->size();

// Initialize shard flows.
shard_flows_.resize(master_context_.num_flows);
DCHECK(!shard_flows_.empty());
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
// Transfer LSN state for partial sync
for (size_t i = 0; i < shard_flows_copy.size(); ++i) {
uint64_t partial_sync_lsn = 0;
if (shard_flows_[i]) {
if (!shard_flows_.empty() && shard_flows_[i]) {
partial_sync_lsn = shard_flows_[i]->JournalExecutedCount();
}
shard_flows_[i].reset(
shard_flows_copy[i].reset(
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
if (partial_sync_lsn > 0) {
shard_flows_[i]->SetRecordsExecuted(partial_sync_lsn);
shard_flows_copy[i]->SetRecordsExecuted(partial_sync_lsn);
}
}
thread_flow_map_ = Partition(shard_flows_.size());

shard_set->pool()->AwaitFiberOnAll([pool_sz, this, &shard_flows_copy](auto index, auto* ctx) {
for (unsigned i = index; i < shard_flows_copy.size(); i += pool_sz) {
shard_flows_copy[i]->SetSocketThread(ProactorBase::me());
}
});
// now update shard_flows on proactor thread
shard_flows_ = std::move(shard_flows_copy);
}

// Initialize and start sub-replica for each flow.
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
auto start_time = absl::Now();

// Initialize MultiShardExecution.
multi_shard_exe_.reset(new MultiShardExecution());

// Initialize shard flows. The update to the shard_flows_ should be done by this thread.
// Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
// below.
InitializeShardFlows();

// Blocked on until all flows got full sync cut.
BlockingCounter sync_block{unsigned(shard_flows_.size())};
Expand Down Expand Up @@ -1210,11 +1234,12 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d

auto Replica::GetSummary() const -> Summary {
auto f = [this]() {
DCHECK(this);
auto last_io_time = LastIoTime();

// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
// it's unlikely to cause a real bug.
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
for (const auto& flow : shard_flows_) {
DCHECK(Proactor() == ProactorBase::me());
DCHECK(flow);
last_io_time = std::max(last_io_time, flow->LastIoTime());
}

Expand Down Expand Up @@ -1246,25 +1271,14 @@ auto Replica::GetSummary() const -> Summary {
return res;
};

if (Sock())
return Proactor()->AwaitBrief(f);

/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then immediately
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
*/
return f();
return Proactor()->AwaitBrief(f);
}

std::vector<uint64_t> Replica::GetReplicaOffset() const {
std::vector<uint64_t> flow_rec_count;
flow_rec_count.resize(shard_flows_.size());
for (const auto& flow : shard_flows_) {
DCHECK(flow.get());
uint32_t flow_id = flow->FlowId();
uint64_t rec_count = flow->JournalExecutedCount();
DCHECK_LT(flow_id, shard_flows_.size());
Expand Down
2 changes: 2 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class Replica : ProtocolClient {
size_t GetRecCountExecutedPerShard(const std::vector<unsigned>& indexes) const;

private:
void InitializeShardFlows();

util::fb2::ProactorBase* proactor_ = nullptr;
Service& service_;
MasterContext master_context_;
Expand Down
Loading
Loading