-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support different clouds in built-in bridge #2821
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files
|
Robot Results
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't fully get the benefit of using raw &str
form of Topic
s and TopicFilter
s in the newly added APIs. It felt like an eventual conversion to a Topic
is unavoidable and hence the benefits of using Cow<&str>
might not be significant.
Also, as discussed offline, the AWS bridge also needs to be added.
match (prefix, target) { | ||
(prefix, target) if prefix.is_empty() => target, | ||
(prefix, target) if target.is_empty() => prefix.clone(), | ||
(prefix, Cow::Borrowed(target)) => format!("{prefix}{target}").into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is the case that's gonna be used 90% of the time (since we don't passthrough any of the messages AFAIK), I'm, not sure if this overhead of using Cow
here and in BridgeRule
is worth it. But, that's no argument to undo something that's already written in an optimal way, which isn't overly complex either. So, let it be. At least, it was good to learn a practical use-case of Cow
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does make a difference (albeit not a particularly notable one). Most of the Cumulocity rules make use of this, for instance. They are &'static str
to begin with as the topics don't change, and the prefix is empty.
It's perhaps also worth pointing out that all we care about here is not adding a prefix, as removing a prefix is trivially done by borrowing a portion of the string slice and therefore never involves cloning.
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum InvalidBridgeRule { | ||
#[error("{0:?} is not a valid MQTT bridge topic prefix as is missing a trailing slash")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[error("{0:?} is not a valid MQTT bridge topic prefix as is missing a trailing slash")] | |
#[error("{0:?} is not a valid MQTT bridge topic prefix as it is missing a trailing slash")] |
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct BridgeRule { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to document this struct with some examples for future users not familiar with the Mosquitto bridge. Esp the cases like empty topic filter which wasn't clear to me on a first look.
fn forwards_unfiltered_topic() { | ||
let cloud_topic = "thinedge/devices/my-device/test-connection"; | ||
let rule = | ||
BridgeRule::try_new("".into(), "aws/test-connection".into(), cloud_topic.into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this rule means that it only subscribes to the single topic aws/test-connection
and none of the sub-hierarchy like aws/test-connection/#
, right? It might be good to add an additional assertion: assert_eq!(rule.apply("aws/test-connection/subtopic"), None)
to highlight that.
I'm guessing that the primary use-case for defining such a rule is to allow forwarding an incoming topic to a completely unrelated outgoing topic. You could highlight that in the doc, as it wasn't clear to me on a first look.
} | ||
|
||
#[test] | ||
fn rejects_invalid_input_topic() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn rejects_invalid_input_topic() { | |
fn rejects_invalid_input_prefix() { |
} | ||
|
||
#[test] | ||
fn rejects_invalid_output_topic() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn rejects_invalid_output_topic() { | |
fn rejects_invalid_output_prefix() { |
prefix_to_add: Cow<'static, str>, | ||
} | ||
|
||
fn prepend<'a>(target: Cow<'a, str>, prefix: &Cow<'a, str>) -> Cow<'a, str> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the usages, I feel that keeping the prefix
as the first argument would have looked more intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to replace this with the +
operator as it's unambiguous and we clearly have opposing opinions here.
} | ||
} | ||
|
||
mod single_converter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mod single_converter { | |
mod topic_converter { |
prefix_to_remove: Cow<'static, str>, | ||
prefix_to_add: Cow<'static, str>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an optional suggestion:
prefix_to_remove: Cow<'static, str>, | |
prefix_to_add: Cow<'static, str>, | |
incoming_prefix: Cow<'static, str>, | |
outgoing_prefix: Cow<'static, str>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the original names compared to those proposed, even if not perfect.
A BridgeRule
can be local_to_remote
or remote_to_local
, so the incoming and outgoing terms are confusing. The add and remove internal usage hints are okay as these names are only used privately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I also intentionally avoided the names local_prefix
and remote_prefix
for the same reason. Suggested incoming
and outgoing
more from the bridge client's perspective, more like input_prefix
and output_prefix
. But yeah, it was just an optional suggestion and can be ignored.
// topic to interact with the shadow of the device | ||
bridge.forward_from_local("shadow/#", local_prefix, things_prefix.clone())?; | ||
bridge.forward_from_remote("shadow/#", local_prefix, things_prefix)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we ensure this doesn't lead to an infinite loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some de-duplication logic, which will detect messages that are identical to ones we have published and avoid republishing them. Using MQTT v3.1, this is I believe the best we can do to avoid infinite loops, as we can't instruct the broker not to send us messages we publish.
On a related note, it may be that we simply don't need this to be bidirectional, or we could split the incoming/outgoing topics. I don't really know anything about the AWS mapper, so I'm not sure how doable this is.
The current logic only affects the bidirectional topics, so there shouldn't be any issues for c8y/az arising from the de-duplication, and most of aws should be completely unaffected. There are also likely more robust ways to achieve this, it currently assumes messages will be delivered in the order sent, even across topics, but we also only have one bidirectional topic for the whole of thin-edge, so at this point it doesn't really matter.
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct BridgeRule { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub struct BridgeRule { | |
struct BridgeRule { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have to review the bulk of the changes, notably in tests/bridge.rs
.
// TODO should this be an error | ||
.unwrap_or_else(|| panic!("Failed to convert {topic:?} with {:?}", self.0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as the bridge rules are hard-coded in the mapper, it's okay to panic here.
let (local_client, local_event_loop) = AsyncClient::new(local_config, 10); | ||
let (cloud_client, cloud_event_loop) = AsyncClient::new(cloud_config, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sudo tedge connect c8y --test
returned an unexpected exit code stdout: Sending packets to check connection.
I observe on my laptop the same behavior as on the failing system tests: no JWT token is received when requested on c8y/s/uat
:
- The message published on
c8y/s/uat
is not even processed by the c8y built-in bridge. - The built-in bridge is even stuck: processing no more messages after a first batch of 11 messages.
- Changing the
cap
parameter from 10 to some crazy number 10000 unblocks the built-in bridge
(this change has to be done for thelocal_config
, thecloud_config
and thebidirectional_channel
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've now understood and fixed the issue here. The issue was the change to the subscriptions. Previously we were subscribing before connecting, but with this PR, we subscribe upon receiving a ConnAck
, which means we resubscribe after a dropped connection.
The issue with this is the cloud connection takes a reasonable period of time to come up after starting the mapper, and if during that time 10 messages are sent, this fills up the channel for the cloud event loop. When the cloud then successfully connects, it attempts to subscribe with recv_client.subscribe_many(&topics).await
. This attempts to send a message to the event loop to process the subscription request, but the event loop is already full, so this results in a deadlock.
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…ion tests Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
4fec17e
to
bbca44d
Compare
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved.
I really like the use of cargo mutants
to improve code coverage. The result gives strong confidence on this tricky piece of code.
$ cargo mutants --test-tool=nextest -p tedge_mqtt_bridge
Found 92 mutants to test
ok Unmutated baseline in 39.6s build + 0.4s test
INFO Auto-set test timeout to 20s
92 mutants tested in 6m 48s: 46 caught, 46 unviable
} | ||
|
||
#[async_trait::async_trait] | ||
#[mutants::skip] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So mutants::skip
removes one missed test. But is this a good idea? Wouldn't be better to know that test coverage can still be improved?
I would at least add cargo mutants
output as a comment
#[mutants::skip] | |
[mutants::skip] // missed: replace <impl MqttAck for AsyncClient>::ack -> Result<(), ClientError> with Ok(()) |
#[mutants::skip] | ||
impl Actor for MqttBridgeActor { | ||
fn name(&self) -> &str { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[mutants::skip] | |
impl Actor for MqttBridgeActor { | |
fn name(&self) -> &str { | |
impl Actor for MqttBridgeActor { | |
#[mutants::skip] | |
fn name(&self) -> &str { |
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Proposed changes
This adds support for AWS and Azure to the built-in bridge, as well as some clean session configuration to ensure we don't lose messages.
Previously the rules were Cumulocity rules, where all we needed to do was remove and add local prefixes, so a large section of the changes are around supporting more complex mapping logic, so a
BridgeRule
struct has been added to support the same configurations as mosquitto.Types of changes
Paste Link to the issue
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments