From e664666918280449c6c6f0a2d0e7818e5732e66b Mon Sep 17 00:00:00 2001 From: Arnaldo Cesco Date: Mon, 11 Sep 2023 15:24:34 +0200 Subject: [PATCH] Do not deadlock when disconnection corner case happens If a device were to disconnect from the broker before having sent its introspection, the connection lost handler would deadlock trying to acquire the lock on the inflight message queue which is still locked. Fix this by moving the first lock to the connection handler, and check that another lock attempt is performed when connection is lost only if the device is not allowed to reconnect. This still guarantees that the inflight message queue lock is released only when (eventually) the introspection is sent. Signed-off-by: Arnaldo Cesco --- device/device.go | 12 +++++------- device/protocol_mqtt_v1.go | 27 +++++++++++++++++++++------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/device/device.go b/device/device.go index d69776c..559ad15 100644 --- a/device/device.go +++ b/device/device.go @@ -49,7 +49,7 @@ type Device struct { astarteAPIClient *client.Client brokerURL string db *gorm.DB - inflightMessages messageQueue + inflightMessages *messageQueue isSendingStoredMessages bool volatileMessages []astarteMessageInfo lastSentIntrospection string @@ -212,6 +212,10 @@ func (d *Device) Connect(result chan<- error) { return } + // Now that the client is up and running, we can start sending messages + value := messageQueue{queue: make(chan astarteMessageInfo, d.opts.MaxInflightMessages)} + d.inflightMessages = &value + // initialize the client if err = d.initializeMQTTClient(); err != nil { if result != nil { @@ -237,12 +241,6 @@ func (d *Device) Connect(result chan<- error) { return } - // Now that the client is up and running, we can start sending messages - d.inflightMessages.queue = make(chan astarteMessageInfo, d.opts.MaxInflightMessages) - // When initialized, mutexes are unlocked (see https://pkg.go.dev/sync#Mutex), - // so we lock it in order to allow publishing messages - // only if introspection has already been sent - d.inflightMessages.Lock() go d.sendLoop() // All good: notify, and our routine is over. diff --git a/device/protocol_mqtt_v1.go b/device/protocol_mqtt_v1.go index 8f5f127..70db297 100644 --- a/device/protocol_mqtt_v1.go +++ b/device/protocol_mqtt_v1.go @@ -61,8 +61,13 @@ func (d *Device) initializeMQTTClient() error { }) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { - // If connection is lost, we should stop dequeuing messages from the channel - d.inflightMessages.Lock() + // Check d.opts.AutoReconnect in order not to lock twice. + if !d.opts.AutoReconnect { + // When connection is lost, we should stop dequeuing messages from the channel. + // If we're not autoreconnecting, lock here rather than during reconnection + // because reconnection will never happen. + d.inflightMessages.Lock() + } if d.OnErrors != nil { d.OnErrors(d, err) } @@ -204,6 +209,12 @@ func (d *Device) handleControlMessages(message string, payload []byte) error { } func astarteOnConnectHandler(d *Device, sessionPresent bool) { + // Check d.opts.AutoReconnect in order not to lock twice. + if d.opts.AutoReconnect { + // When connection is lost, we should stop dequeuing messages from the channel. + // This will guarantee that messages are dequeued only after introspection is sent. + d.inflightMessages.Lock() + } // Generate Introspection first introspection := d.generateDeviceIntrospection() @@ -259,7 +270,7 @@ func astarteOnConnectHandler(d *Device, sessionPresent bool) { } } - // Since control messages have been sent, we allow to send data + // Since control messages have been sent, we allow to send data once again. d.inflightMessages.Unlock() // If some messages must be retried, do so @@ -394,12 +405,16 @@ func (d *Device) UnsetProperty(interfaceName, path string) error { // The main publishing loop: retrieves messages from the publishing channel and sends them one at a time, in order func (d *Device) sendLoop() { for { - d.inflightMessages.Lock() - d.publishMessage(<-d.inflightMessages.queue) - d.inflightMessages.Unlock() + d.publishInflightMessage() } } +func (d *Device) publishInflightMessage() { + d.inflightMessages.Lock() + defer d.inflightMessages.Unlock() + d.publishMessage(<-d.inflightMessages.queue) +} + func (d *Device) hasAlreadySentPropertyValue(interfaceName, path string, newValue interface{}) bool { // the path has already been validated oldValue, err := d.GetProperty(interfaceName, path)