Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of Pipe to support dynamic tracks #51

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 36 additions & 28 deletions ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@ namespace eprosima {
namespace ddspipe {
namespace core {

/*
* IDEA:
* The pipe is no longer the main class that manages everything.
* The pipe becomes a passive agent that only store tracks and the logic to add new entities to those tracks.
*
* All the logic of allowed topics is moved to the upper user, in most cases to the main app.
* It allows to make it specific for participants or pps.
*
* All the logic of discovery is moved to the upper user, in most cases to the main app.
* It supports the creation of tracks without discovering entities, and the easy configuration whether to create
* them only with readers or always.
*
* The main app (Router, Spy, Recorder) must set these data and manage the logic of creation of tracks, what
* makes the pipe much more versatile.
* So far, in cases as Spy or Recorder, it exist a simulated endpoint that is added to the discovery database
* only to create a track. Now it would only require to call a function of the Pipe.
*/

// This may be the track itself with functions to get internal info.
// Doing so may allow the participants to directly register writers in such track.
using TrackInfo = std::pair<ITopic, ParticipantId>;

using OnTrackCreationCallback = std::function<bool(TrackInfo, std::shared_ptr<IWriter>)>;

/**
* TODO
*/
Expand Down Expand Up @@ -59,12 +83,7 @@ class DdsPipe
*/
DDSPIPE_CORE_DllAPI
DdsPipe(
const std::shared_ptr<AllowedTopicList>& allowed_topics,
const std::shared_ptr<DiscoveryDatabase>& discovery_database,
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<ParticipantsDatabase>& participants_database,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const std::set<utils::Heritable<types::DistributedTopic>>& builtin_topics = {},
bool start_enable = false);

/**
Expand Down Expand Up @@ -124,6 +143,18 @@ class DdsPipe
DDSPIPE_CORE_DllAPI
utils::ReturnCode disable() noexcept;

// Registering here a callback make the pipe to call this callback with each create_track call.
// This also removes is_repeater from IParticipant, as the participant itself would already know if want to
// add a writer to its own track.
void register_on_track_creation_callback(OnTrackCreationCallback callback);

// Create a track that consume (put data in) data from this reader.
void create_track(ITopic topic, IReader reader);

std::set<TrackInfo> get_tracks(ITopic topic);

void add_to_track(TrackInfo track, IWriter writer);

protected:

/////////////////////////
Expand Down Expand Up @@ -264,29 +295,6 @@ class DdsPipe
//! List of allowed and blocked topics
std::shared_ptr<AllowedTopicList> allowed_topics_;

/**
* @brief Common discovery database
*
* This object is shared by every Participant.
* Every time an endpoint is discovered by any Participant, it should be
* added to the database.
*/
std::shared_ptr<DiscoveryDatabase> discovery_database_;

/**
* @brief Common payload pool where every payload will be stored
*
* This payload will be shared by every endpoint.
* Every reader will store its data in the pool, the track will pass this
* data to the writers, that will release it after used.
*/
std::shared_ptr<PayloadPool> payload_pool_;

/**
* @brief Object that stores every Participant running in the DdsPipe
*/
std::shared_ptr<ParticipantsDatabase> participants_database_;

//! Thread Pool for tracks
std::shared_ptr<utils::SlotThreadPool> thread_pool_;

Expand Down
Loading