Skip to content

Commit

Permalink
Merge pull request #1625 from FIWARE/dds/notifications
Browse files Browse the repository at this point in the history
Broker now subscribes to DDS notifications and pushes entities to its current state accordingly
  • Loading branch information
kzangeli authored Jun 8, 2024
2 parents 2a4befa + 9896bc5 commit 6f02c14
Show file tree
Hide file tree
Showing 19 changed files with 535 additions and 63 deletions.
19 changes: 16 additions & 3 deletions src/app/orionld/orionld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ extern "C"
#include "kalloc/kaBufferReset.h" // kaBufferReset
#include "kjson/kjBufferCreate.h" // kjBufferCreate
#include "kjson/kjFree.h" // kjFree
#include "kjson/kjBuilder.h" // kjChildAdd
#include "kjson/kjLookup.h" // kjLookup
#include "ktrace/kTrace.h" // trace messages - ktrace library
}
Expand Down Expand Up @@ -134,6 +135,7 @@ extern "C"
#include "orionld/troe/pgConnectionPoolsFree.h" // pgConnectionPoolsFree
#include "orionld/troe/pgConnectionPoolsPresent.h" // pgConnectionPoolsPresent
#include "orionld/distOp/distOpInit.h" // distOpInit
#include "orionld/dds/ddsInit.h" // ddsInit

#include "orionld/version.h"
#include "orionld/orionRestServices.h"
Expand Down Expand Up @@ -248,6 +250,8 @@ bool triggerOperation = false;
bool noprom = false;
bool noArrayReduction = false;
bool ddsSupport = false;
char ddsSubsTopics[512];
char ddsTopicType[512];



Expand Down Expand Up @@ -342,7 +346,9 @@ bool ddsSupport = false;
#define CORE_CONTEXT_DESC "core context version (v1.0|v1.3|v1.4|v1.5|v1.6|v1.7) - v1.6 is default"
#define NO_PROM_DESC "run without Prometheus metrics"
#define NO_ARR_REDUCT_DESC "skip JSON-LD Array Reduction"
#define USE_DDS "turn on DDS support"
#define USE_DDS_DESC "turn on DDS support"
#define DDS_SUBS_TOPICS_DESC "topics to subscribe to on DDS"
#define DDS_TOPIC_TYPE_DESC "DDS topic type"



Expand Down Expand Up @@ -448,7 +454,9 @@ PaArgument paArgs[] =
{ "-lmtmp", &lmtmp, "TMP_TRACES", PaBool, PaHid, true, false, true, TMPTRACES_DESC },
{ "-noprom", &noprom, "NO_PROM", PaBool, PaHid, false, false, true, NO_PROM_DESC },
{ "-noArrayReduction", &noArrayReduction, "NO_ARRAY_REDUCTION", PaBool, PaHid, false, false, true, NO_ARR_REDUCT_DESC },
{ "-dds", &ddsSupport, "DDS", PaBool, PaOpt, false, false, true, USE_DDS },
{ "-dds", &ddsSupport, "DDS", PaBool, PaOpt, false, false, true, USE_DDS_DESC },
{ "-ddsSubsTopics", ddsSubsTopics, "DDS_SUBS_TOPICS", PaString, PaOpt, _i "", PaNL, PaNL, DDS_SUBS_TOPICS_DESC },
{ "-ddsTopicType", ddsTopicType, "DDS_TOPIC_TYPE", PaString, PaOpt, _i "NGSI-LD", PaNL, PaNL, DDS_TOPIC_TYPE_DESC },

PA_END_OF_ARGS
};
Expand Down Expand Up @@ -497,7 +505,7 @@ void daemonize(void)
if (pid > 0)
{
isFatherProcess = true;
exit(0);
exit(0);
}

// Change the file mode mask */
Expand Down Expand Up @@ -1071,6 +1079,8 @@ int main(int argC, char* argV[])
exit(1);
}

ktVerbose = KTRUE;

coreContextUrl = coreContextUrlSetup(coreContextVersion);
if (coreContextUrl == NULL)
LM_X(1, ("Invalid version for the Core Context: %s (valid: v1.0|v1.3|v1.4|v1.5|v1.6|v1.7)", coreContextVersion));
Expand Down Expand Up @@ -1413,6 +1423,9 @@ int main(int argC, char* argV[])
if (pernot == true)
pernotLoopStart();

if (ddsSupport == true)
ddsInit(ddsTopicType, ddsSubsTopics, DDSOpModeDefault);

if (socketService == true)
{
int fd;
Expand Down
5 changes: 5 additions & 0 deletions src/lib/logMsg/traceLevels.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ typedef enum TraceLevels
LmtLegacySubMatch, // Old code - update/subscription match for subs/notifs
LmtLegacySubCacheRefresh, // Old code - sub-cache-refresh

//
// DDS
//
LmtDds = 230, // DDS

LmtCurl = 250, // CURL library
LmtToDo, // To Do list
LmtPatchEntity, // Real merge+patch
Expand Down
1 change: 1 addition & 0 deletions src/lib/orionld/common/orionldState.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ typedef struct OrionldConnectionState
char* entityId;
OrionldUriParamOptions uriParamOptions;
OrionldUriParams uriParams;
bool upsert;
char* errorAttributeArrayP;
char errorAttributeArray[512];
int errorAttributeArrayUsed;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/orionld/dds/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ SET (SOURCES
DdsNotificationSender.cpp
ddsSubscribe.cpp
ddsPublish.cpp
ddsNotification.cpp
ddsInit.cpp
kjTreeLog.cpp
)

Expand Down
50 changes: 25 additions & 25 deletions src/lib/orionld/dds/DdsNotificationReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,8 @@ using namespace eprosima::fastdds::dds;

// -----------------------------------------------------------------------------
//
// ddsDumpArray - accumulating data from DDS notifications
// on_subscription_matched -
//
extern KjNode* ddsDumpArray;



void DdsNotificationReceiver::on_subscription_matched(DataReader*, const SubscriptionMatchedStatus& info)
{
if (info.current_count_change == 1)
Expand All @@ -68,6 +64,12 @@ void DdsNotificationReceiver::on_subscription_matched(DataReader*, const Subscri
KT_T(StDds, "'%d' is not a valid value for SubscriptionMatchedStatus current count change", info.current_count_change);
}



// -----------------------------------------------------------------------------
//
// on_data_available -
//
void DdsNotificationReceiver::on_data_available(DataReader* reader)
{
SampleInfo info;
Expand All @@ -91,7 +93,7 @@ void DdsNotificationReceiver::on_data_available(DataReader* reader)
//
// Accumulate notifications
//
KjNode* dump = kjObject(NULL, "item"); // No name as it is part of an array
KjNode* notification = kjObject(NULL, "item"); // No name as it is part of an array
KjNode* tenantP = (ngsildEntity_.tenant() != "")? kjString(NULL, "tenant", ngsildEntity_.tenant().c_str()) : NULL;
KjNode* idP = (ngsildEntity_.id() != "")? kjString(NULL, "id", ngsildEntity_.id().c_str()) : NULL;
KjNode* typeP = (ngsildEntity_.type() != "")? kjString(NULL, "type", ngsildEntity_.type().c_str()) : NULL;
Expand All @@ -100,12 +102,12 @@ void DdsNotificationReceiver::on_data_available(DataReader* reader)
KjNode* modifiedAtP = (ngsildEntity_.modifiedAt() != 0)? kjInteger(NULL, "modifiedAt", ngsildEntity_.modifiedAt()) : NULL;
char* attributes = (ngsildEntity_.attributes() != "")? (char*) ngsildEntity_.attributes().c_str() : NULL;

if (tenantP != NULL) kjChildAdd(dump, tenantP);
if (idP != NULL) kjChildAdd(dump, idP);
if (typeP != NULL) kjChildAdd(dump, typeP);
if (scopeP != NULL) kjChildAdd(dump, scopeP);
if (createdAtP != NULL) kjChildAdd(dump, createdAtP);
if (modifiedAtP != NULL) kjChildAdd(dump, modifiedAtP);
if (tenantP != NULL) kjChildAdd(notification, tenantP);
if (idP != NULL) kjChildAdd(notification, idP);
if (typeP != NULL) kjChildAdd(notification, typeP);
if (scopeP != NULL) kjChildAdd(notification, scopeP);
if (createdAtP != NULL) kjChildAdd(notification, createdAtP);
if (modifiedAtP != NULL) kjChildAdd(notification, modifiedAtP);

if (attributes != NULL)
{
Expand All @@ -114,26 +116,24 @@ void DdsNotificationReceiver::on_data_available(DataReader* reader)
// Initializing orionldState, to call kjParse (not really necessary, it's overkill)
orionldStateInit(NULL);

// parse the string 'attributes' and add all attributes to 'dump'
// parse the string 'attributes' and add all attributes to 'notification'
KjNode* attrsNode = kjParse(orionldState.kjsonP, attributes);
if (attrsNode != NULL)
attrsNode = kjClone(NULL, attrsNode);
KT_T(StDds, "After kjParse");

kjTreeLog2(attrsNode, "attrsNode", StDds);
kjTreeLog2(dump, "dump w/o attrs", StDds);
// Concatenate the attributes to the "dump entity"
dump->lastChild->next = attrsNode->value.firstChildP;
dump->lastChild = attrsNode->lastChild;
kjTreeLog2(dump, "dump with attrs", StDds);
// Concatenate the attributes to the "notification entity"
notification->lastChild->next = attrsNode->value.firstChildP;
notification->lastChild = attrsNode->lastChild;
}
else
KT_T(StDds, "Entity Id: %s has no attributes", ngsildEntity_.id().c_str());

if (ddsDumpArray == NULL)
ddsDumpArray = kjArray(NULL, "ddsDumpArray");

kjChildAdd(ddsDumpArray, dump);
if (callback_ != NULL)
{
KT_T(StDds, "Calling notification callback function");
callback_(notification);
}
else
KT_W("No notification callback function!");
}
}
}
24 changes: 18 additions & 6 deletions src/lib/orionld/dds/DdsNotificationReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,21 @@
#include "fastdds/dds/topic/TypeSupport.hpp"

#include "orionld/dds/config.h" // DDS_RELIABLE, ...
#include "orionld/dds/NgsildEntity.h" // NgsildEntity
#include "orionld/dds/kjTreeLog.h" // kjTreeLog2



using namespace eprosima::fastdds::dds;


// -----------------------------------------------------------------------------
//
// DdsNotificationFunction - callback for reception of DDS samples
//
typedef void (*DdsNotificationFunction)(KjNode* notificationP);



// -----------------------------------------------------------------------------
//
Expand All @@ -67,13 +75,17 @@ using namespace eprosima::fastdds::dds;
class DdsNotificationReceiver : public DataReaderListener
{
public:
DdsNotificationReceiver() : samples_(0) { }
~DdsNotificationReceiver() override { }
DdsNotificationReceiver() : samples_(0), callback_(NULL) { }
explicit DdsNotificationReceiver(DdsNotificationFunction callback) : samples_(0), callback_(callback) { }
~DdsNotificationReceiver() override { }

std::atomic_int samples_;

void on_subscription_matched(DataReader*, const SubscriptionMatchedStatus& info) override;
void on_data_available(DataReader* reader) override;

void on_subscription_matched(DataReader*, const SubscriptionMatchedStatus& info) override;
void on_data_available(DataReader* reader) override;
NgsildEntity ngsildEntity_;
std::atomic_int samples_;
NgsildEntity ngsildEntity_;
DdsNotificationFunction callback_;
};

#endif // SRC_LIB_ORIONLD_DDS_DDSNOTIFICATIONRECEIVER_H_
9 changes: 0 additions & 9 deletions src/lib/orionld/dds/NgsildPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,12 @@ bool NgsildPublisher::publish(KjNode* entityP)
++attempts;
}

KT_V("Publishing an entity");
if (listener_.ready_ == false)
{
KT_W("listener still not ready after waiting for %d milliseconds", attempts);
return false;
}

KT_V("Publishing an entity");
if (listener_.matched_ <= 0)
{
KT_W("listener not matched");
Expand All @@ -186,23 +184,20 @@ bool NgsildPublisher::publish(KjNode* entityP)
if (entityP == NULL)
KT_X(1, "entityP == NULL");

KT_V("Publishing an entity");
KjNode* tenantNodeP = kjLookup(entityP, "tenant");
KjNode* idNodeP = kjLookup(entityP, "id");
KjNode* typeNodeP = kjLookup(entityP, "type");
KjNode* scopeNodeP = kjLookup(entityP, "scope");
KjNode* createdAtNodeP = kjLookup(entityP, "createdAt");
KjNode* modifiedAtNodeP = kjLookup(entityP, "modifiedAt");

KT_V("Publishing an entity");
const char* tenant = (tenantNodeP != NULL)? tenantNodeP->value.s : NULL;
const char* id = (idNodeP != NULL)? idNodeP->value.s : NULL;
const char* type = (typeNodeP != NULL)? typeNodeP->value.s : NULL;
const char* scope = (scopeNodeP != NULL)? scopeNodeP->value.s : NULL;
const long long createdAt = (createdAtNodeP != NULL)? createdAtNodeP->value.i : 0;
const long long modifiedAt = (modifiedAtNodeP != NULL)? modifiedAtNodeP->value.i : 0;

KT_V("Publishing an entity");
if (tenantNodeP != NULL) kjChildRemove(entityP, tenantNodeP);
if (idNodeP != NULL) kjChildRemove(entityP, idNodeP);
if (typeNodeP != NULL) kjChildRemove(entityP, typeNodeP);
Expand All @@ -211,19 +206,15 @@ bool NgsildPublisher::publish(KjNode* entityP)
if (modifiedAtNodeP != NULL) kjChildRemove(entityP, modifiedAtNodeP);

// Only attributes left now
KT_V("Publishing an entity");
char* serialized = NULL;
if (entityP->value.firstChildP != NULL)
{
kjTreeLog2(entityP, "Entity to publish", StDds);

int size = kjFastRenderSize(entityP);

KT_V("Publishing an entity");
serialized = (char*) malloc(size * 2 + 256); // free? :)
KT_V("Publishing an entity");
kjFastRender(entityP, serialized);
KT_V("Publishing an entity");
}

KT_V("tenant: '%s'", tenant);
Expand Down
3 changes: 2 additions & 1 deletion src/lib/orionld/dds/NgsildSubscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ class NgsildSubscriber
DdsNotificationReceiver listener_;

public:
explicit NgsildSubscriber(const char* topicType)
explicit NgsildSubscriber(const char* topicType, DdsNotificationFunction callback)
: participant_(nullptr)
, subscriber_(nullptr)
, reader_(nullptr)
, topic_(nullptr)
, type_(new NgsildEntityPubSubType(topicType))
, listener_(callback)
{
}

Expand Down
74 changes: 74 additions & 0 deletions src/lib/orionld/dds/ddsInit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* Copyright 2024 FIWARE Foundation e.V.
*
* This file is part of Orion-LD Context Broker.
*
* Orion-LD Context Broker is free software: you can redistribute it and/or
* modify it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* Orion-LD Context Broker is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
* General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/.
*
* For those usages not covered by this license please contact with
* orionld at fiware dot org
*
* Author: Ken Zangelin
*/
extern "C"
{
#include "ktrace/kTrace.h" // trace messages - ktrace library
#include "kbase/kStringSplit.h" // kStringSplit
}

#include "orionld/common/orionldState.h" // orionldState, kjTreeLog
#include "orionld/dds/ddsSubscribe.h" // ddsSubscribe
#include "orionld/dds/ddsNotification.h" // ddsNotification
#include "orionld/dds/ddsInit.h" // Own interface



// -----------------------------------------------------------------------------
//
// ddsOpMode -
//
DdsOperationMode ddsOpMode;



// -----------------------------------------------------------------------------
//
// ddsInit - initialization function for DDS
//
// PARAMETERS
// * ddsTopicType
// * ddsSubsTopics
// * mode - the DDS mode the broker is working in
//
int ddsInit(const char* ddsTopicType, char* ddsSubsTopics, DdsOperationMode _ddsOpMode)
{
ddsOpMode = _ddsOpMode;

if (ddsSubsTopics[0] == 0)
return 0;

KT_V("topics: %s", ddsSubsTopics);
char* topicV[100];
int topics = kStringSplit(ddsSubsTopics, ',', topicV, 3);

KT_V("no of topics: %d", topics);
for (int ix = 0; ix < topics; ix++)
{
KT_V("Subscribing to DDS topic %s::%s", ddsTopicType, topicV[ix]);
ddsSubscribe(ddsTopicType, topicV[ix], ddsNotification);
}

return 0;
}
Loading

0 comments on commit 6f02c14

Please sign in to comment.