Skip to content

Commit

Permalink
Add support for session expiry (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmdjulian authored Apr 11, 2024
1 parent f0125bd commit fbcaff6
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ mqtt.host=test.mosquitto.org
mqtt.port=1883
# The clientId to use when connecting (random by default).
mqtt.client-id=test
# The session expiry interval in seconds, has to be in [0, 4294967295] (0 by default). Only for mqtt 5.
mqtt.session-expiry=0
# The username to use when connecting.
mqtt.username=admin
# The password to use when connecting.
Expand Down
6 changes: 5 additions & 1 deletion src/main/kotlin/de/smartsquare/starter/mqtt/MqttConnector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class Mqtt5Connector(

val connectOptions = Mqtt5Connect.builder()
.cleanStart(config.clean)
.sessionExpiryInterval(config.sessionExpiry)
.build()

logger.info("Connecting to ${if (username != null) "$username@" else ""}$host:$port using mqtt 5...")
Expand All @@ -153,7 +154,10 @@ class Mqtt5Connector(
}

override fun stop(callback: Runnable) {
client.disconnect().thenRun(callback)
client.disconnectWith()
.sessionExpiryInterval(config.sessionExpiry)
.send()
.thenRun(callback)
}

override fun isRunning() = client.state != MqttClientState.DISCONNECTED
Expand Down
11 changes: 11 additions & 0 deletions src/main/kotlin/de/smartsquare/starter/mqtt/MqttProperties.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.smartsquare.starter.mqtt

import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect
import jakarta.validation.constraints.Max
import jakarta.validation.constraints.Min
import jakarta.validation.constraints.NotEmpty
Expand Down Expand Up @@ -71,6 +72,16 @@ data class MqttProperties(
* The shutdown configuration for the mqtt processor.
*/
val shutdown: MqttShutdown = MqttShutdown.GRACEFUL,

/**
* The session expiry interval in seconds. Has to be in [0, 4294967295] (0 by default).
* Setting the value to 0 means the session will expire immediately after disconnect.
* Setting it to 4_294_967_295 means the session will never expire.
* This setting is only going into effect for MQTT 5.
*/
@get:Min(0)
@get:Max(Mqtt5Connect.NO_SESSION_EXPIRY)
val sessionExpiry: Long = Mqtt5Connect.DEFAULT_SESSION_EXPIRY_INTERVAL,
) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
"type": "de.smartsquare.starter.mqtt.MqttProperties.MqttShutdown",
"description": "The shutdown configuration for the mqtt processor.",
"defaultValue": "graceful"
},
{
"name": "mqtt.sessionExpiry",
"type": "java.lang.Long",
"description": "The session expiry configuration for the mqtt processor in seconds. Using 0 expires the session immediately after disconnect. Using the max value of 4294967295 marks the session as never expiring. Has to be in [0..4294967295]. This setting is only going into effect for MQTT 5.",
"defaultValue": 0
}
],
"hints": [
Expand Down
12 changes: 12 additions & 0 deletions src/test/kotlin/de/smartsquare/starter/mqtt/MqttPropertiesTest.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package de.smartsquare.starter.mqtt

import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect
import org.amshove.kluent.shouldBeEmpty
import org.amshove.kluent.shouldContain
import org.amshove.kluent.shouldHaveSize
import org.amshove.kluent.shouldStartWith
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.validation.ValidationAutoConfiguration
import org.springframework.boot.test.context.SpringBootTest
Expand Down Expand Up @@ -36,6 +39,7 @@ class MqttPropertiesTest {
clean = false,
group = "group",
version = 5,
sessionExpiry = 1000,
),
)

Expand All @@ -56,6 +60,14 @@ class MqttPropertiesTest {
errors.allErrors.shouldHaveSize(1)
}

@ParameterizedTest
@ValueSource(longs = [-1, Mqtt5Connect.NO_SESSION_EXPIRY + 1])
fun `validates session expiry`(expiry: Long) {
val errors = validator.validateObject(MqttProperties(host = "localhost", port = 1883, sessionExpiry = expiry))

errors.allErrors.shouldHaveSize(1)
}

@Test
fun `validates mqtt version`() {
val errors = validator.validateObject(MqttProperties(host = "localhost", port = 10000, version = 2))
Expand Down

0 comments on commit fbcaff6

Please sign in to comment.