Skip to content

Commit

Permalink
Merge pull request #4625 from pwojcikdev/vote-generator-inproc
Browse files Browse the repository at this point in the history
Avoid creating temporary inproc channels
  • Loading branch information
pwojcikdev authored May 16, 2024
2 parents 58e1daa + 2e025fe commit f7aa48c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 86 deletions.
4 changes: 2 additions & 2 deletions nano/core_test/fair_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ TEST (fair_queue, cleanup)
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);

// Either closing or resetting the channel should make it eligible for cleanup
// Only closing the channel should make it eligible for cleanup
channel1->close ();
channel2.reset ();

Expand All @@ -275,5 +275,5 @@ TEST (fair_queue, cleanup)
ASSERT_TRUE (queue.periodic_update (0s));

ASSERT_TRUE (queue.empty ());
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.queues_size (), 2);
}
99 changes: 17 additions & 82 deletions nano/node/fair_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,29 @@ template <typename Request, typename Source>
class fair_queue final
{
public:
struct origin
{
Source source;
std::shared_ptr<nano::transport::channel> channel;

origin (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source },
channel{ channel }
{
}
};

private:
/**
* Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request.
*/
struct origin_entry
struct origin
{
Source source;

// Optional is needed to distinguish between a source with no associated channel and a source with an expired channel
// TODO: Store channel as shared_ptr after networking fixes are done
std::optional<std::weak_ptr<nano::transport::channel>> maybe_channel;
// This can be null for some sources (eg. local RPC) to indicate that the source is not associated with a channel.
std::shared_ptr<nano::transport::channel> channel;

origin_entry (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source }
origin (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source },
channel{ std::move (channel) }
{
if (channel)
{
maybe_channel = std::weak_ptr{ channel };
}
}

origin_entry (origin const & origin) :
origin_entry (origin.source, origin.channel)
{
}
origin (origin const & origin) = default;

bool alive () const
{
if (maybe_channel)
if (channel)
{
if (auto channel_l = maybe_channel->lock ())
{
return channel_l->alive ();
}
else
{
return false;
}
return channel->alive ();
}
else
{
Expand All @@ -76,49 +49,10 @@ class fair_queue final
}
}

// TODO: Store channel as shared_ptr to avoid this mess
auto operator<=> (origin_entry const & other) const
{
// First compare source
if (auto cmp = source <=> other.source; cmp != 0)
{
return cmp;
}

if (maybe_channel && other.maybe_channel)
{
// Then compare channels by ownership, not by the channel's value or state
std::owner_less<std::weak_ptr<nano::transport::channel>> less;
if (less (*maybe_channel, *other.maybe_channel))
{
return std::strong_ordering::less;
}
if (less (*other.maybe_channel, *maybe_channel))
{
return std::strong_ordering::greater;
}
}
else
{
if (maybe_channel && !other.maybe_channel)
{
return std::strong_ordering::greater;
}
if (!maybe_channel && other.maybe_channel)
{
return std::strong_ordering::less;
}
}

return std::strong_ordering::equivalent;
}

operator origin () const
{
return { source, maybe_channel ? maybe_channel->lock () : nullptr };
}
auto operator<=> (origin const & other) const = default;
};

private:
struct entry
{
using queue_t = std::deque<Request>;
Expand Down Expand Up @@ -264,6 +198,7 @@ class fair_queue final
value_type next ()
{
release_assert (!empty ()); // Should be checked before calling next
debug_assert ((std::chrono::steady_clock::now () - last_update) < 60s); // The queue should be cleaned up periodically

if (should_seek ())
{
Expand All @@ -283,6 +218,8 @@ class fair_queue final

std::deque<value_type> next_batch (size_t max_count)
{
periodic_update ();

auto const count = std::min (size (), max_count);

std::deque<value_type> result;
Expand Down Expand Up @@ -358,13 +295,11 @@ class fair_queue final
}

private:
std::map<origin_entry, entry> queues;
typename std::map<origin_entry, entry>::iterator iterator{ queues.end () };
std::map<origin, entry> queues;
typename std::map<origin, entry>::iterator iterator{ queues.end () };
size_t counter{ 0 };

size_t total_size{ 0 };

std::chrono::steady_clock::time_point last_update{};
std::chrono::steady_clock::time_point last_update{ std::chrono::steady_clock::now () };

public:
std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const
Expand Down
5 changes: 3 additions & 2 deletions nano/node/vote_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::
stats (stats_a),
logger (logger_a),
is_final (is_final_a),
vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 1024 * 4 }
vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 1024 * 4 },
inproc_channel{ std::make_shared<nano::transport::inproc::channel> (node, node) }
{
vote_generation_queue.process_batch = [this] (auto & batch) {
process_batch (batch);
Expand Down Expand Up @@ -250,7 +251,7 @@ void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const &
{
network.flood_vote_pr (vote_a);
network.flood_vote (vote_a, 2.0f);
vote_processor.vote (vote_a, std::make_shared<nano::transport::inproc::channel> (node, node)); // TODO: Avoid creating a temporary channel each time
vote_processor.vote (vote_a, inproc_channel);
}

void nano::vote_generator::run ()
Expand Down
1 change: 1 addition & 0 deletions nano/node/vote_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,6 @@ class vote_generator final
std::deque<candidate_t> candidates;
std::atomic<bool> stopped{ false };
std::thread thread;
std::shared_ptr<nano::transport::channel> inproc_channel;
};
}

0 comments on commit f7aa48c

Please sign in to comment.