Skip to content

Commit

Permalink
address corner case in resultset_wire_container_impl::annex
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Nov 6, 2024
1 parent d60666f commit 39db4ff
Showing 1 changed file with 36 additions and 25 deletions.
61 changes: 36 additions & 25 deletions src/tateyama/endpoint/ipc/bootstrap/server_wires_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,27 @@ class server_wire_container_impl : public server_wire_container

void operator()() {
while (true) {
std::unique_lock<std::mutex> lock(mtx_queue_);
cnd_queue_.wait(lock, [this]{ return !queue_.empty() || released_; });
if (queue_.empty() && released_) {
VLOG_LP(log_trace) << "exit writer annex mode because the end of the records";
write_complete();
break;
{
std::unique_lock<std::mutex> lock(mtx_queue_);

cnd_queue_.wait(lock, [this]{ return !queue_.empty() || released_; });
if (queue_.empty() && released_) {
VLOG_LP(log_trace) << "exit writer annex mode because the end of the records";
write_complete();
break;
}
auto annex = queue_.front().get();
lock.unlock();
if (auto rv = annex->transfer(shm_resultset_wire_, released_); !rv) {
VLOG_LP(log_trace) << "exit writer annex mode because the result set is closed by the client";
break;
}
}
auto annex = queue_.front().get();
lock.unlock();
if (auto rv = annex->transfer(shm_resultset_wire_, released_); !rv) {
VLOG_LP(log_trace) << "exit writer annex mode because the result set is closed by the client";
break;
{
std::unique_lock<std::mutex> lock(mtx_queue_);

queue_.pop();
}
queue_pop();
}
std::atomic_thread_fence(std::memory_order_acq_rel);
thread_active_ = false;
Expand Down Expand Up @@ -194,17 +201,17 @@ class server_wire_container_impl : public server_wire_container
std::unique_lock<std::mutex> lock(mtx_queue_);

if (!queue_.empty()) {
current_annex_ = queue_.back().get();
if (current_annex_->is_room(length)) {
current_annex_->write(data, length);
auto* current_annex = queue_.back().get();
if (current_annex->is_room(length)) {
current_annex->write(data, length);
return;
}
}
VLOG_LP(log_trace) << "extend annex";
queue_.emplace(std::make_unique<annex>(datachannel_buffer_size_));
cnd_queue_.notify_one();
current_annex_ = queue_.back().get();
current_annex_->write(data, length);
auto* current_annex = queue_.back().get();
current_annex->write(data, length);
}
}
void flush() override {
Expand All @@ -213,7 +220,14 @@ class server_wire_container_impl : public server_wire_container
shm_resultset_wire_->flush();
return;
}
current_annex_->flush();
{
std::unique_lock<std::mutex> lock(mtx_queue_);

if (!queue_.empty()) {
auto* current_annex = queue_.back().get();
current_annex->flush();
}
}
}
void release(unq_p_resultset_wire_conteiner resultset_wire) override;
bool is_disposable() override { return !thread_active_; }
Expand All @@ -223,7 +237,6 @@ class server_wire_container_impl : public server_wire_container
resultset_wires_container_impl &resultset_wires_container_impl_;

std::queue<std::unique_ptr<annex>> queue_{};
annex* current_annex_{};
std::thread writer_thread_{};
bool annex_mode_{};
std::atomic_bool thread_invoked_{};
Expand All @@ -237,10 +250,6 @@ class server_wire_container_impl : public server_wire_container
std::size_t current_record_size{};

void write_complete();
void queue_pop() {
std::unique_lock<std::mutex> lock(mtx_queue_);
queue_.pop();
}
};
static void resultset_wire_deleter_impl(resultset_wire_container* resultset_wire) {
delete dynamic_cast<resultset_wire_container_impl*>(resultset_wire); // NOLINT
Expand Down Expand Up @@ -604,10 +613,12 @@ inline void server_wire_container_impl::resultset_wire_container_impl::release(u
resultset_wires_container_impl_.add_released_writer(std::move(resultset_wire_conteiner));
if (thread_invoked_) {
std::unique_lock<std::mutex> lock(mtx_queue_);

released_ = true;
cnd_queue_.notify_one();
if (current_annex_) {
current_annex_->notify();
if (!queue_.empty()) {
auto* current_annex = queue_.back().get();
current_annex->notify();
}
} else {
resultset_wires_container_impl_.write_complete();
Expand Down

0 comments on commit 39db4ff

Please sign in to comment.