-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for built-in bridge to tedge-mapper-c8y #2716
Conversation
if let Event::Incoming(Incoming::Publish(publish)) = ¬ification { | ||
cloud_client | ||
.publish( | ||
publish.topic.strip_prefix(&topic_prefix).unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than some mapping hard-coded inside the bridge logic, I would consider to pass a function that convert a source topic to a target topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we consider externalising the mapping rules, where the mapping rules can be defined in an external config file(similar to what Mosquitto has done) and use that for the mapping? That way, we can reuse the same bridge actor with all the cloud mappers using diff config files. It also enables the user to extend the mapper with additional mapping rules for things like custom templates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to think carefully on this. We don't need to make the mapping too open
- Mappings which are critical to the function of the agent (e.g.
c8y/s/ds
andc8y/s/us
etc.) - Optional user specified mapping (e.g.
c8y/dc/<TemplateCollectionID>
etc.)
The first time of mappings should not be influenced by the user as this would lead to a non-functional agent
The second point is already configurable via the tedge.toml file, so we don't need a second level of configuration which converts the tedge.toml to the a mapping file rule. We could still open up to allow users to subscribe/publish additional topics (this will be important for the Cumulocity IoT MQTT Connect feature)...though we can still review the configuration mechanisms for the custom SmartREST templates etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first time of mappings should not be influenced by the user as this would lead to a non-functional agent
Yeah definitely, the core and extended mapping rules must be separate to avoid the user messing up the core rules. How about keeping the core mapping rules in a separate file, which should not be directly manipulated by the user for any extensions?
Keeping the mapping rules outside the rust code makes the development of a new mapper easier (without even needing to know Rust), right? The goal was to achieve both "mqtt broker independence" and "generic mapping" with a single design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we don't need an intermediate file format...we only used that in the past as it was required by mosquitto, but now we are in control if it, then we don't need it.
We still need to decide what the generic mapping looks like, as I think it is too early to tell if this is configuration, templates or code modules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mind, the primary reason for the existence of the mapping rules we have in any case is so tedge-mapper can function correctly. It therefore seems a bit daft to me to store these outside of the mapper code (though they will need relocating to cope with non-Cumulocity clouds).
Additionally, I think the main hinderance to "generic mapping" at the moment is merely the fact it requires additional effort at the moment for someone to update mosquitto configurations, and so long as we don't need to do that, I don't think we're likely to introduce significant barriers to that being a possibility in the future. In general, it seems like a bad idea to try and attack both replacing the existing bridge configuration with something equivalent in the mapper and adding new features to the bridge (beyond trivial matters like automatically subscribing the bridge to new custom smartrest topics).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mind, the primary reason for the existence of the mapping rules we have in any case is so tedge-mapper can function correctly. It therefore seems a bit daft to me to store these outside of the mapper code (though they will need relocating to cope with non-Cumulocity clouds).
Externalizing the bridge rules is not needed if we want to keep things in the current form, with dedicated mapper processes for each cloud. But, with the desire to have a "generic mapper" eventually, which I'd assume will cater to all the clouds, an externalized config file seemed like the easiest way, especially to enable no-code development of mappers.
But we don't need an intermediate file format...we only used that in the past as it was required by mosquitto, but now we are in control if it, then we don't need it.
Good point. This makes me wonder if we should reconsider the current mapping/bridging approach as well. So far, we needed the c8y/#
and az/#
like bridge topics, just because the bridge was external. So, the mapper was forced to send the mapped messages to the broker so that the external bridge client can pick them up. But now that we have the converter and the bridge in the same process, the messages between the converter actor and the bridge actor can be exchanged via in-memory channels (message boxes) rather than the broker, eliminating all the serializing/deserializing cost + network hops via the broker.
Obviously, this would be a much bigger change and the approach taken in this PR can be considered as the first step towards that goal.
We still need to decide what the generic mapping looks like, as I think it is too early to tell if this is configuration, templates or code modules.
Fair point. I was just trying to see if there's a common ground here.
In my head, the generic mapper is something that behaves like(not directly using) the mosquitto bridge, but with a more sophisticated mapping rule-set for topic/payload manipulations as well beyond the simple topic-to-topic bridging offered by mosquitto. So, an external config file with just the bridge config aspects felt like a good first step, which can later be enhanced with the relevant mapping rules as well.
But, having this conversation, I get a feeling that we all have different views on how that generic mapper would work, and hence may be it's too early to discuss those aspects under the scope of this ticket.
Robot Results
|
abbc4b8
to
466e179
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A major step is reached: implementing a bridge is feasible using rumqttc
.
async fn one_way_bridge<F: for<'a> Fn(&'a str) -> Cow<'a, str>>( | ||
mut recv_event_loop: EventLoop, | ||
target: AsyncClient, | ||
transform_topic: F, | ||
mut tx_pubs: mpsc::Sender<Publish>, | ||
mut rx_pubs: mpsc::Receiver<Publish>, | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nicely tied knot. A gem!
However, it took me sometime to convince myself that this is working.
- From the names, it's not obvious to see that
tx_pubs
andrx_pubs
are not a(tx, rx)
channel pair, but a sender and a receiver connected to their counterpart in another instance of aone_way_bridge
. - In fact, this function cannot be understood in isolation, but only paired with another instance handling the flow the other way around.
- It's also not obvious, how are retrieved the
pkid
of the messages published on the target, as this is done by the companion loop. The companion loop will also acknowledge the reception of the messages received by this one. Perfect, but not straightforward.
How this can be improved?
- A comment must definitely be added telling that an instance of this loop can only be understood with its companion loop.
- Some ascii art might help to grasp the relationship between two such loops.
- I also would consider to add some
debug_assert!
, notably to assess that the reception of message is acknowledged on the correct connection (i.e. that is on theone_way_bridge
receiving messages fromc8y/#
topics, only messages sent to this topic should be acknowledged).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was some equivalent to the assertion with a call to unwrap()
when calling forward_pkid_to_received_msg.remove()
. By unwrapping the value, we ensure the packet id is known, and this should only happen reliably if everything is working correctly. If we have accidentally read packet ids for the cloud when we should be reading them for the local connection, one of the directions will quickly encounter an acknowledgement for an unknown packet id. I've currently removed this since I'm confident the loop now works, and I think this particular bug is difficult to achieve due to the required input to the function.
I've also modified the channels to be a single bidirectional channel to avoid exposing states which don't make sense, and (mostly) documented the function (still a little bit to do there, and ascii art will likely help)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also modified the channels to be a single bidirectional channel to avoid exposing states which don't make sense
This is indeed a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is some ascii art describing how the bridge works https://gist.github.com/didier-wenzek/d37d7cdb8bdbd5b0b572f6fb81d42801
Event::Outgoing(Outgoing::Publish(pkid)) => { | ||
if let Some(msg) = rx_pubs.next().await { | ||
forward_pkid_to_received_msg.insert(pkid, msg); | ||
} else { | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really robust, as this requires a perfect synchronization between two event streams? Sure these streams are not independent (the companion loop triggers one event for this rx_pubs
and publishes a message triggering the OutGoing
event. But I would be cautious here. Mainly because of QoS subtleties. Do we have such an outgoing event for QoS 0? If not, then no message should be propagated to the companion loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replied to this mostly in #2716 (comment). I think the answer is yes, we are robust as QoS 0 messages do go through the same series of events, just the packet identifiers are all 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just the packet identifiers are all 0
Okay then
) | ||
.await | ||
.unwrap(); | ||
tx_pubs.send(publish).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also have to do this for QoS 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not. I've modified the code to not send None
in the case of QoS 0
, but https://docs.rs/rumqttc/latest/rumqttc/enum.Outgoing.html#variant.Publish suggests that QoS 0
messages will have an associated Publish
event with a packet id of 0, so in reality this would be unlikely to cause any issues.
What I'm less sure about is how, if at all, we may wish to protect ourselves against misbehaving MQTT clients. If we aren't receiving acknowledgements, this could result in the map from packet id to original message growing an unbounded amount.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'm less sure about is how, if at all, we may wish to protect ourselves against misbehaving MQTT clients. If we aren't receiving acknowledgements, this could result in the map from packet id to original message growing an unbounded amount.
This can be controlled setting a bound to the number of in flight messages
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Fixed. I forgot this API existed, but it was checking for the existence of the bridge config file, which obviously doesn't exist with the built-in bridge |
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite having very detailed documentation on the message flow, the message flow is still not fully clear to me. So, take my comments with a pinch of salt, as some of it may be just irrelevant, coming out of my broken understanding.
May be just a naive view but I still have this whole bridging imagined as a single select!
loop over the local and remote event loops and I'm really struggling to move away from that mental model to what's proposed here.
BTW seeing the amount of changes required to have that c8y
prefix pushed into various layers makes me miss singletons available in other languages.
cloud_config.set_transport(Transport::tls_with_config(tls_config.into())); | ||
let topic_prefix = format!("{prefix}/"); | ||
let (local_client, local_event_loop) = AsyncClient::new(local_config, 10); | ||
let (cloud_client, cloud_event_loop) = AsyncClient::new(cloud_config, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hope this client is resilient to the cloud connection being down at the very moment this client is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be. The client won't connect to the cloud until the event loop is polled AFAIK, and that definitely cleanly handles disconnection/reconnection/pinging (you just keep polling it and it will do its best to keep the connection healthy.
let (err, health_payload) = match result { | ||
Ok(_) => { | ||
info!("MQTT bridge connected to {name} broker"); | ||
(None, MQTT_BRIDGE_UP_PAYLOAD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming that you're using 1
and 0
as health status just during the transition phase, to be consistent with the existing mosquitto bridge. Once the transition is complete, we can use the health messages in the tedge JSON format itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done that for now to get the existing code working, which checks if the bridge is healthy before running parts of the mapper. It would make sense to use a JSON format message (although that sounds like a breaking API change to me)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make sense to use a JSON format message (although that sounds like a breaking API change to me)
Since the health endpoint service name itself is new (tedge-mapper-bridge-c8y
and not mosquitto-c8y-bridge
), it should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make sense to use a JSON format message (although that sounds like a breaking API change to me)
Since the health endpoint service name itself is new (
tedge-mapper-bridge-c8y
and notmosquitto-c8y-bridge
), it should be fine.
Beware that the logic to differentiate a bridge related topic is really weak:
https://github.com/thin-edge/thin-edge.io/blob/main/crates/extensions/c8y_mapper_ext/src/service_monitor.rs#L29
name, | ||
health_topic, | ||
target, | ||
last_err: Some("dummy error".into()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last_err: Some("dummy error".into()), | |
// Starting with a dummy error instead of None as None represents healthy, | |
// and we can't assume that the bridge is healthy right from the start | |
last_err: Some("dummy error".into()), |
Just adding some commentary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the dummy value is no longer necessary as I've changed when we log, so I'll set it to None
and then it shouldn't need explanation.
| Incoming::PubRec(PubRec { pkid: ack_pkid }), | ||
) => { | ||
if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) { | ||
target.ack(&msg).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't target.ack(&msg).await.unwrap()
be done for Incoming::PubComp
as well? Or is that done automatically if we don't have an explicit message handler here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/bytebeamio/rumqtt/blob/bfc9f9a6932d10ad085354ea1600daf5437399d0/rumqttc/src/state.rs#L288-L290 shows that rumqttc always responds to PUBREL
with PUBCOMP
, regardless of whether manual_acks
are enabled, so we don't have that level of control with rumqttc. By comparison, the equivalent handling for PUBLISH
only responds with PUBREC
if manual_acks
is set to false
: https://github.com/bytebeamio/rumqtt/blob/bfc9f9a6932d10ad085354ea1600daf5437399d0/rumqttc/src/state.rs#L219-L222
…o local broker Signed-off-by: James Rhodes <jarhodes314@gmail.com>
built-in bridge Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…6e17 Signed-off-by: James Rhodes <jarhodes314@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved. Thank you for this nice step forward.
At least two points have to be addressed in following PRs:
- Fix device certificate ownership when running the built-in bridge.
- Support TLS connections on the local bus.
@@ -57,6 +57,7 @@ impl CreateCertCmd { | |||
|
|||
let cert = KeyCertPair::new_selfsigned_certificate(config, &self.id, key_kind)?; | |||
|
|||
// TODO cope with broker user being tedge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I noticed that with build-in bridge the mapper must run as root, simply because of this key.
We will need follow-up PRs to have the build-in bridge working nicely as the former bridge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapper shouldn't need to run as root. The latest changes modify tedge reconnect c8y
to cope with this case, changing the ownership of existing certificates to tedge
, but I didn't bother to make the same changes here since it wasn't necessary and is more involved to test (and really does want system testing, so in my mind it's best to do these both in a follow up PR).
Execute Command tedge mqtt pub ${C8Y_TOPIC_PREFIX}/s/us '200,CustomMeasurement,temperature,25' | ||
${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=CustomMeasurement series=temperature | ||
Log ${measurements} | ||
Execute Command systemctl start tedge-mapper-c8y |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On mapper restart, the message sent meantime should be pushed.
Execute Command systemctl start tedge-mapper-c8y | |
Execute Command systemctl start tedge-mapper-c8y | |
${measurements}= Device Should Have Measurements minimum=2 maximum=2 type=CustomMeasurement series=temperature |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate this is the desired behaviour, although adding this line indicates that the behaviour isn't currently working as intended, so I'll omit it for now and create a follow up issue
...s/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot
Show resolved
Hide resolved
...s/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot
Show resolved
Hide resolved
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Set Suite Variable $DEVICE_SN | ||
Device Should Exist ${DEVICE_SN} | ||
ThinEdgeIO.Execute Command tedge config set c8y.bridge.built_in true | ||
ThinEdgeIO.Execute Command tedge config set c8y.bridge.topic_prefix custom-c8y-prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having in mind that you use variable for the prefix I would use it here as well, like:
ThinEdgeIO.Execute Command tedge config set c8y.bridge.topic_prefix ${C8Y_TOPIC_PREFIX}
Like this the test can be used in custom prefixes given during runtime
(e.g. robot -v C8Y_TOPIC_PREFIX:some prefix thin-edge_device_telemetry_built-in_bridge.robot
)
Proposed changes
This adds the bridge connection to the mapper discussed in #2592. It is currently Cumulocity only, and doesn't support all the possible mosquitto connection restrictions, but does function correctly on a basic level.
It also adds support for a configurable topic prefix in the mapper bridge via the configuration
c8y.bridge.topic_prefix
.In 556a661, I've added a (hidden) configuration
c8y.bridge.in_mapper
c8y.bridge.built_in
to control where the mapper runs. When set tofalse
, currently the default, this should work like it does now: the mosquitto bridge configuration is generated as before, and the mapper doesn't attempt to run a bridge. When set totrue
, the mapper will run the bridge components, and the mosquitto configuration will be deleted. Upgrading tedge and then runningtedge reconnect c8y
should now work, including the connectivity check and setting the permissions for the certificates. The system tests added in this PR verify this.Types of changes
Paste Link to the issue
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments