-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#2760] Improved handling of MQTT adapter shutdown. #3386
[#2760] Improved handling of MQTT adapter shutdown. #3386
Conversation
1add57f
to
5288278
Compare
5288278
to
6573d69
Compare
...mmand/src/main/java/org/eclipse/hono/client/command/CommandRouterCommandConsumerFactory.java
Outdated
Show resolved
Hide resolved
...mmand/src/main/java/org/eclipse/hono/client/command/CommandRouterCommandConsumerFactory.java
Outdated
Show resolved
Hide resolved
...-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/eclipse/hono/commandrouter/impl/DelegatingCommandRouterAmqpEndpoint.java
Outdated
Show resolved
Hide resolved
6573d69
to
5ad837a
Compare
...mmand-router/src/main/java/org/eclipse/hono/commandrouter/impl/CommandRouterServiceImpl.java
Outdated
Show resolved
Hide resolved
...-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java
Outdated
Show resolved
Hide resolved
5ad837a
to
15b903b
Compare
core/src/main/java/org/eclipse/hono/util/TenantAndDeviceId.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/eclipse/hono/util/TenantAndDeviceId.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/eclipse/hono/util/TenantAndDeviceId.java
Outdated
Show resolved
Hide resolved
913473e
to
aa60490
Compare
...mmand/src/main/java/org/eclipse/hono/client/command/CommandRouterCommandConsumerFactory.java
Outdated
Show resolved
Hide resolved
<groupId>org.eclipse.hono</groupId> | ||
<artifactId>hono-client-telemetry</artifactId> | ||
</dependency> | ||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Severe Vulnerability:
pkg:maven/org.eclipse.hono/hono-client-telemetry-kafka@2.2.0-SNAPSHOT
0 Critical, 1 Severe, 0 Moderate, 0 Unknown vulnerabilities have been found across 1 dependencies
Components
pkg:maven/io.netty/netty-handler@4.1.78.Final
SEVERE Vulnerabilities (1)
Channel Accessible by Non-Endpoint
CVSS Score: 6.5
CVSS Vector: CVSS:3.1/AV:N/AC:H/PR:N/UI:N/S:U/C:H/I:L/A:N
CWE: CWE-300
ℹ️ Learn about @sonatype-lift commands
You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.
Command | Usage |
---|---|
@sonatype-lift ignore |
Leave out the above finding from this PR |
@sonatype-lift ignoreall |
Leave out all the existing findings from this PR |
@sonatype-lift exclude <file|issue|path|tool> |
Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file |
Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.
Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]
<groupId>org.eclipse.hono</groupId> | ||
<artifactId>hono-client-telemetry-kafka</artifactId> | ||
</dependency> | ||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Severe Vulnerability:
pkg:maven/org.eclipse.hono/hono-client-telemetry-amqp@2.2.0-SNAPSHOT
0 Critical, 1 Severe, 0 Moderate, 0 Unknown vulnerabilities have been found across 1 dependencies
Components
pkg:maven/io.netty/netty-handler@4.1.78.Final
SEVERE Vulnerabilities (1)
Channel Accessible by Non-Endpoint
CVSS Score: 6.5
CVSS Vector: CVSS:3.1/AV:N/AC:H/PR:N/UI:N/S:U/C:H/I:L/A:N
CWE: CWE-300
ℹ️ Learn about @sonatype-lift commands
You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.
Command | Usage |
---|---|
@sonatype-lift ignore |
Leave out the above finding from this PR |
@sonatype-lift ignoreall |
Leave out all the existing findings from this PR |
@sonatype-lift exclude <file|issue|path|tool> |
Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file |
Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.
Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]
Introduced unregister-cmd-consumers method at Command Router API. That is a batch version of unregister-cmd-consumer. The API is used only while the adapter is stopping or device is disconnected. Signed-off-by: Nikolay Deliyski <Nikolay.Deliyski@bosch.io>
aa60490
to
8543078
Compare
final Boolean sendEvent = Optional.ofNullable(AmqpUtils.getApplicationProperty( | ||
request, | ||
MessageHelper.APP_PROPERTY_SEND_EVENT, | ||
Boolean.class)).orElse(false); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FMPOV we should put this into a private method returning a primitive boolean so that there is no need to bother if it is null
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, will do.
* @param spanContext The span context representing the request to be processed. | ||
* @return The response to send to the client via the event bus. | ||
*/ | ||
protected Future<Message> processUnregisterCommandConsumers(final Message request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rename this to e.g. processUnregisterMultipleCommandConsumers so that it is easier to distinguish from the existing processUnregisterCommandConsumer method? Maybe we should even rename the existing method to processUnregisterSingleCommandConsumer ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, that having Single and Multiple as part of the name is easier to read.
What about the CommandRouterAction enum? Does it make sense to have the same naming as part of the enum predefined constants?
return finishSpanOnFutureCompletion(span, resultFuture); | ||
} | ||
|
||
private Future<List<CommandRouterDeviceInfo>> parseDeviceInfo(final Message request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static ?
@@ -77,6 +79,7 @@ Future<CommandRouterResult> setLastKnownGatewayForDevice(String tenantId, Map<St | |||
* | |||
* @param tenantId The tenant id. | |||
* @param deviceId The device id. | |||
* @param sendEvent {@code true} if <em>connected notification</em> event should be sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a case where this will be false
? my understanding of the discussion in #2760 is that we want to completely move the responsibility for sending the connected/disconnected events to the Command Router, or am I mistaken?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As agreed with @calohmn this PR is about the MQTT adapter only. For the moment the other adapters will use false
.
Later with other PRs we could adjust the others. But in general I think the direction is to move that responsibility to the Command Router for all adapters. As there is no difference between them on that.
final Span span, | ||
final boolean sendDisconnectedEvent) { | ||
final Pair<CommandSubscription, CommandConsumer> subscriptionConsumerPair, | ||
final boolean sendDisconnectedEvent, final Span span) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting: span param in new line.
}) | ||
.compose(v -> { | ||
if (sendDisconnectedEvent) { | ||
return sendDisconnectedTtdEvent(subscription.getTenant(), subscription.getDeviceId(), span); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since no event gets sent here anymore, the log output a few lines above should be adapted, dropping the skip sending disconnected event
part.
final Span sendEventSpan = newChildSpan(span.context(), "send Disconnected Event"); | ||
return AbstractVertxBasedMqttProtocolAdapter.this.sendDisconnectedTtdEvent(tenant, device, authenticatedDevice, sendEventSpan.context()) | ||
.onComplete(r -> sendEventSpan.finish()).mapEmpty(); | ||
private Future<Void> onCommandSubscriptionRemovedViaRelease( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a method name releaseCommandConsumer
would fit better here. The first param could be changed to contain the CommandConsumer
.
@@ -33,4 +33,34 @@ public interface CommandConsumer { | |||
* closed/overwritten already. | |||
*/ | |||
Future<Void> close(SpanContext spanContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the changes in this class should better go into a new CommandConsumer
class in the adapter-base package.
This lets us differentiate between CommandConsumers in the adapter classes, needing the added functionality, and other CommandConsumer usages, where only close(SpanContext)
is needed.
FMPOV, the close(SpanContext)
method in the new class should be removed then, so that always the close
method with the sendEvent
parameter has to be used, making it more explicit what the behaviour should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to use the same name CommandConsumer
for the old and new interface?
There are two CommandConsumerFactory
too. For me it is a bit misleading.
What about DeviceCommandConsumer
(new) and TenantCommandConsumer
(old)?
* Indicates the consumer is not needed any more. The actual closing is up to the implementation. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note could be added here, that the default implementation calls the close
method.
/** | ||
* <em>notification</em> event flag. | ||
* | ||
* @return {@code true} if <em>connected notification</em> event should be sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connected or disconnected
if (sendEvent) { | ||
return tenantClient.get(tenantId, span.context()).compose( | ||
tenantObject -> sendDisconnectedTtdEvent(tenantObject, deviceId, adapterInstanceId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If sendEvent
is true, the event should also be sent if removeCommandHandlingAdapterInstance
failed. Sending should only be skipped if there was a HTTP_PRECON_FAILED
error status on removeCommandHandlingAdapterInstance
.
See the logic in the old AbstractVertxBasedMqttProtocolAdapter.onCommandSubscriptionRemoved()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I have missed that flow. Will fix that.
} | ||
|
||
/** | ||
* Verifies that a client invocation of the <em>unregister-command-consumera</em> operation fails if the command |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: consumera
*/ | ||
public void setBatchSize(final int batchSize) { | ||
if (batchSize < 0) { | ||
throw new IllegalArgumentException("batch size could not be negative"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
must not be
*/ | ||
public void setBatchMaxTimeout(final long batchMaxTimeout) { | ||
if (batchMaxTimeout < 0) { | ||
throw new IllegalArgumentException("batch max timeout could not be negative"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
must not be
removeCommandConsumers(pendingConsumersToClose.size(), span.context()) | ||
.onComplete(v -> span.finish()); | ||
// mark the timer as finished although the batch operation is still ongoing | ||
batchTimerId = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1
isn't defined as an impossible timer id value. Therefore, I think it's better to use a Boolean
and assign null
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will adapt.
final CommandRouterDeviceInfo crdi = pendingConsumersToClose.remove(0); | ||
toSend.add(CommandRouterDeviceInfo.of(crdi.tenantId(), crdi.deviceId(), crdi.sendEvent())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really needed to create a CommandRouterDeviceInfo
copy here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, not needed, it seems a left not cleared code.
...mmand/src/main/java/org/eclipse/hono/client/command/CommandRouterCommandConsumerFactory.java
Show resolved
Hide resolved
} | ||
} | ||
|
||
private Future<Void> removeCommandConsumers(final int count, final SpanContext spanContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, count
is always set to pendingConsumersToClose.size()
here, making the parameter look superfluous.
Just looking at the code of this method, it also should better be made obvious that pendingConsumersToClose.remove(0)
below can't produce an IndexOutOfBoundsException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will improve.
At least the first call should use batchSize instead of pendingConsumersToClose.size() as first parameter.
if (pendingConsumersToClose.size() >= batchSize) { | ||
return removeCommandConsumers(pendingConsumersToClose.size(), onReleaseSpanContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about cancelling an existing timer here? Otherwise that timer execution could cause a unregisterCommandConsumers
request with only a few items, being sent ahead of batchMaxTimeout
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense, will adapt.
@@ -69,16 +80,24 @@ public class CommandRouterCommandConsumerFactory implements CommandConsumerFacto | |||
* | |||
* @param commandRouterClient The client to use for accessing the command router service. | |||
* @param adapterName The name of the protocol adapter. | |||
* @param tracer The Tracer to use for injecting the context. | |||
* @param batchSize The number of pending operations that trigger their processing as part of a batch operation. | |||
* @param batchMaxTimeout The maximum period of time after which an operation is processed as part of a batch operation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Time unit should be mentioned here.
So, let's say we have 10.000 devices connected to an MQTT adapter instance and we want to shut it down. How many |
One of the original ideas was to fully skip the I think, to further improve on this "adapter-shutdown with 10.000 subscribed devices" case, we could let the adapter send maybe just one or two big batch |
We seem to agree that we need to make sure that whatever mechanism we devise, it works for both the graceful shutdown of an adapter as well as the crashing of an adapter. FMPOV this means that the mechanism can not require the adapter to unregister the command consumer for all connected devices before stopping/crashing, or am I mistaken? |
Not necessarily, FMPOV. I think we have to consider the implications of letting the tasks here be done without any trigger from the adapter itself. The trigger would have to come from the KubernetesBasedAdapterInstanceStatusService instead. That service can detect pods having gone offline, but only after a considerable delay to prevent false positives (see #3379). That delay could be higher than the what we want the disconnected event sending delay to be. Thinking further about such an approach:
EDIT: |
The functionality will be implemented in #3422. |
Introduced unregister-cmd-consumers method at Command Router API.
That is a batch version of unregister-cmd-consumer.
The API is used only while the adapter is stopping or device is disconnected.
Signed-off-by: Nikolay Deliyski Nikolay.Deliyski@bosch.io