Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/intro content filters #31

Open
wants to merge 7 commits into
base: release/6.0.1.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions 4_keys_instances/c++98/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ inline void print_station_kind(StationKind station_kind)
case COCOA_BUTTER_CONTROLLER:
std::cout << "COCOA_BUTTER_CONTROLLER";
break;
case COCOA_LIQUOR_CONTROLLER:
std::cout << "COCOA_LIQUOR_CONTROLLER";
break;
case VANILLA_CONTROLLER:
std::cout << "VANILLA_CONTROLLER";
break;
Expand Down
5 changes: 2 additions & 3 deletions 4_keys_instances/chocolate_factory.idl
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ struct Temperature {
// Kind of station processing the chocolate
enum StationKind {
INVALID_CONTROLLER,
SUGAR_CONTROLLER,
COCOA_BUTTER_CONTROLLER,
COCOA_LIQUOR_CONTROLLER,
VANILLA_CONTROLLER,
SUGAR_CONTROLLER,
MILK_CONTROLLER,
VANILLA_CONTROLLER,
TEMPERING_CONTROLLER
};

Expand Down
3 changes: 2 additions & 1 deletion 5_basic_qos/c++11/monitoring_ctrl_application.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ void monitor_temperature(dds::sub::DataReader<Temperature>& reader)
// returned when LoanedSamples destructor called.
dds::sub::LoanedSamples<Temperature> samples = reader.take();

// Receive updates from stations about the state of current lots
// Receive updates from tempering station about chocolate temperature.
// Only an error if over 32 degrees Fahrenheit.
for (const auto& sample : samples) {
if (sample.info().valid()) {
if (sample.data().degrees() > 32) {
Expand Down
3 changes: 0 additions & 3 deletions 5_basic_qos/c++98/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ inline void print_station_kind(StationKind station_kind)
case COCOA_BUTTER_CONTROLLER:
std::cout << "COCOA_BUTTER_CONTROLLER";
break;
case COCOA_LIQUOR_CONTROLLER:
std::cout << "COCOA_LIQUOR_CONTROLLER";
break;
case VANILLA_CONTROLLER:
std::cout << "VANILLA_CONTROLLER";
break;
Expand Down
5 changes: 2 additions & 3 deletions 5_basic_qos/chocolate_factory.idl
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ struct Temperature {
// Kind of station processing the chocolate
enum StationKind {
INVALID_CONTROLLER,
SUGAR_CONTROLLER,
COCOA_BUTTER_CONTROLLER,
COCOA_LIQUOR_CONTROLLER,
VANILLA_CONTROLLER,
SUGAR_CONTROLLER,
MILK_CONTROLLER,
VANILLA_CONTROLLER,
TEMPERING_CONTROLLER
};

Expand Down
204 changes: 204 additions & 0 deletions 5_content_filters/c++11/ingredient_application.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* (c) Copyright, Real-Time Innovations, 2020. All rights reserved.
* RTI grants Licensee a license to use, modify, compile, and create derivative
* works of the software solely for use with RTI Connext DDS. Licensee may
* redistribute copies of the software provided that all such copies are subject
* to this license. The software is provided "as is", with no warranty of any
* type, including any warranty for fitness for any purpose. RTI is under no
* obligation to maintain or support the software. RTI shall not be liable for
* any incidental or consequential damages arising out of the use or inability
* to use the software.
*/

#include <iostream>
#include <thread>

#include <dds/pub/ddspub.hpp>
#include <dds/sub/ddssub.hpp>
#include <rti/util/util.hpp> // for sleep()
#include <rti/config/Logger.hpp> // for logging
// Or simply include <dds/dds.hpp>

#include "chocolate_factory.hpp"
#include "application.hpp" // Argument parsing

using namespace application;

// Ingredient application:
// 1) Subscribes to the lot state
// 2) "Processes" the lot. (In this example, that means sleep for a time)
// 3) After "processing" the lot, publishes an updated lot state

void process_lot(
const StationKind station_kind,
const std::map<StationKind, StationKind>& next_station,
dds::sub::DataReader<ChocolateLotState>& lot_state_reader,
dds::pub::DataWriter<ChocolateLotState>& lot_state_writer)
{
// Take all samples. Samples are loaned to application, loan is
// returned when LoanedSamples destructor called.
dds::sub::LoanedSamples<ChocolateLotState> samples =
lot_state_reader.take();

// Process lots waiting for tempering
for (const auto& sample : samples) {
if (!sample.info().valid() || shutdown_requested) {
break;
}

// No need to check that this is the next station: content filter
// ensures that the reader only receives lots with
// next_station == this station
std::cout << "Processing lot #" << sample.data().lot_id() << std::endl;

// Send an update that this station is processing lot
ChocolateLotState updated_state(sample.data());
updated_state.lot_status(LotStatusKind::PROCESSING);
updated_state.next_station(StationKind::INVALID_CONTROLLER);
updated_state.station(station_kind);
lot_state_writer.write(updated_state);

// "Processing" the lot.
rti::util::sleep(dds::core::Duration(5));

// Send an update that this station is done processing lot
updated_state.lot_status(LotStatusKind::COMPLETED);
updated_state.next_station(next_station.at(station_kind));
updated_state.station(station_kind);
lot_state_writer.write(updated_state);
}
} // The LoanedSamples destructor returns the loan

StationKind string_to_stationkind(const std::string& station_kind)
{
if (station_kind == "SUGAR_CONTROLLER") {
return StationKind::SUGAR_CONTROLLER;
} else if (station_kind == "COCOA_BUTTER_CONTROLLER") {
return StationKind::COCOA_BUTTER_CONTROLLER;
} else if (station_kind == "MILK_CONTROLLER") {
return StationKind::MILK_CONTROLLER;
} else if (station_kind == "VANILLA_CONTROLLER") {
return StationKind::VANILLA_CONTROLLER;
}
return StationKind::INVALID_CONTROLLER;
}

void run_example(unsigned int domain_id, const std::string& station_kind)
{
StationKind current_station = string_to_stationkind(station_kind);
std::cout << station_kind << " station starting" << std::endl;
// The stations are in a fixed order, this defines which station is next
const std::map<StationKind, StationKind> next_station {
{ StationKind::COCOA_BUTTER_CONTROLLER, StationKind::SUGAR_CONTROLLER },
{ StationKind::SUGAR_CONTROLLER, StationKind::MILK_CONTROLLER },
{ StationKind::MILK_CONTROLLER, StationKind::VANILLA_CONTROLLER },
{ StationKind::VANILLA_CONTROLLER, StationKind::TEMPERING_CONTROLLER }
};

// Loads the QoS from the qos_profiles.xml file.
dds::core::QosProvider qos_provider("./qos_profiles.xml");

// A DomainParticipant allows an application to begin communicating in
// a DDS domain. Typically there is one DomainParticipant per application.
// Uses TemperingApplication QoS profile to set participant name.
dds::domain::DomainParticipant participant(
domain_id,
qos_provider.participant_qos(
"ChocolateFactoryLibrary::TemperingApplication"));

// A Topic has a name and a datatype. Create Topics.
// Topic names are constants defined in the IDL file.
dds::topic::Topic<ChocolateLotState> lot_state_topic(
participant,
CHOCOLATE_LOT_STATE_TOPIC);
std::string filter_value = "'" + station_kind + "'";
dds::topic::ContentFilteredTopic<ChocolateLotState>
filtered_lot_state_topic(
lot_state_topic,
"FilteredLot",
dds::topic::Filter("next_station = %0", { filter_value }));

// A Publisher allows an application to create one or more DataWriters
// Create Publisher with default QoS
dds::pub::Publisher publisher(participant);

// Create DataWriter of Topic "ChocolateLotState"
// using ChocolateLotStateProfile QoS profile for State Data
dds::pub::DataWriter<ChocolateLotState> lot_state_writer(
publisher,
lot_state_topic,
qos_provider.datawriter_qos(
"ChocolateFactoryLibrary::ChocolateLotStateProfile"));

// A Subscriber allows an application to create one or more DataReaders
dds::sub::Subscriber subscriber(participant);

// Create DataReader of Topic "ChocolateLotState".
// using ChocolateLotStateProfile QoS profile for State Data
dds::sub::DataReader<ChocolateLotState> lot_state_reader(
subscriber,
filtered_lot_state_topic,
qos_provider.datareader_qos(
"ChocolateFactoryLibrary::ChocolateLotStateProfile"));

// Obtain the DataReader's Status Condition
dds::core::cond::StatusCondition reader_status_condition(lot_state_reader);

// Contains statuses that entities can be notified about
using dds::core::status::StatusMask;

// Enable the 'data available' and 'requested incompatible qos' statuses
reader_status_condition.enabled_statuses(StatusMask::data_available());

// Associate a handler with the status condition. This will run when the
// condition is triggered, in the context of the dispatch call (see below)
reader_status_condition.extensions().handler([&]() {
if ((lot_state_reader.status_changes() & StatusMask::data_available())
!= StatusMask::none()) {
process_lot(
current_station,
next_station,
lot_state_reader,
lot_state_writer);
}
});

// Create a WaitSet and attach the StatusCondition
dds::core::cond::WaitSet waitset;
waitset += reader_status_condition;

while (!shutdown_requested) {
// Wait for ChocolateLotState
std::cout << "Waiting for lot" << std::endl;
waitset.dispatch(dds::core::Duration(10)); // Wait up to 10s for update
}
}

int main(int argc, char *argv[])
{
// Parse arguments and handle control-C
auto arguments = parse_arguments(argc, argv);
if (arguments.parse_result == ParseReturn::PARSE_RETURN_EXIT) {
return EXIT_SUCCESS;
} else if (arguments.parse_result == ParseReturn::PARSE_RETURN_FAILURE) {
return EXIT_FAILURE;
}
setup_signal_handlers();

// Sets Connext verbosity to help debugging
rti::config::Logger::instance().verbosity(arguments.verbosity);

try {
run_example(arguments.domain_id, arguments.station_kind);
} catch (const std::exception& ex) {
// This will catch DDS exceptions
std::cerr << "Exception in run_example(): " << ex.what() << std::endl;
return EXIT_FAILURE;
}

// Releases the memory used by the participant factory. Optional at
// application shutdown
dds::domain::DomainParticipant::finalize_participant_factory();

return EXIT_SUCCESS;
}
Loading