Skip to content

Commit

Permalink
Do not deadlock when disconnection corner case happens
Browse files Browse the repository at this point in the history
If a device were to disconnect from the broker before having sent
its introspection, the connection lost handler would deadlock
trying to acquire the lock on the inflight message queue which is
still locked.
Fix this by moving the first lock to the connection handler,
and check that another lock attempt is performed when connection
is lost only if the device is not allowed to reconnect.
This still guarantees that the inflight message queue lock is
released only when (eventually) the introspection is sent.

Signed-off-by: Arnaldo Cesco <arnaldo.cesco@secomind.com>
  • Loading branch information
Annopaolo committed Sep 14, 2023
1 parent 3cdbb55 commit e664666
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
12 changes: 5 additions & 7 deletions device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Device struct {
astarteAPIClient *client.Client
brokerURL string
db *gorm.DB
inflightMessages messageQueue
inflightMessages *messageQueue
isSendingStoredMessages bool
volatileMessages []astarteMessageInfo
lastSentIntrospection string
Expand Down Expand Up @@ -212,6 +212,10 @@ func (d *Device) Connect(result chan<- error) {
return
}

// Now that the client is up and running, we can start sending messages
value := messageQueue{queue: make(chan astarteMessageInfo, d.opts.MaxInflightMessages)}
d.inflightMessages = &value

// initialize the client
if err = d.initializeMQTTClient(); err != nil {
if result != nil {
Expand All @@ -237,12 +241,6 @@ func (d *Device) Connect(result chan<- error) {
return
}

// Now that the client is up and running, we can start sending messages
d.inflightMessages.queue = make(chan astarteMessageInfo, d.opts.MaxInflightMessages)
// When initialized, mutexes are unlocked (see https://pkg.go.dev/sync#Mutex),
// so we lock it in order to allow publishing messages
// only if introspection has already been sent
d.inflightMessages.Lock()
go d.sendLoop()

// All good: notify, and our routine is over.
Expand Down
27 changes: 21 additions & 6 deletions device/protocol_mqtt_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,13 @@ func (d *Device) initializeMQTTClient() error {
})

opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
// If connection is lost, we should stop dequeuing messages from the channel
d.inflightMessages.Lock()
// Check d.opts.AutoReconnect in order not to lock twice.
if !d.opts.AutoReconnect {
// When connection is lost, we should stop dequeuing messages from the channel.
// If we're not autoreconnecting, lock here rather than during reconnection
// because reconnection will never happen.
d.inflightMessages.Lock()
}
if d.OnErrors != nil {
d.OnErrors(d, err)
}
Expand Down Expand Up @@ -204,6 +209,12 @@ func (d *Device) handleControlMessages(message string, payload []byte) error {
}

func astarteOnConnectHandler(d *Device, sessionPresent bool) {
// Check d.opts.AutoReconnect in order not to lock twice.
if d.opts.AutoReconnect {
// When connection is lost, we should stop dequeuing messages from the channel.
// This will guarantee that messages are dequeued only after introspection is sent.
d.inflightMessages.Lock()
}
// Generate Introspection first
introspection := d.generateDeviceIntrospection()

Expand Down Expand Up @@ -259,7 +270,7 @@ func astarteOnConnectHandler(d *Device, sessionPresent bool) {
}
}

// Since control messages have been sent, we allow to send data
// Since control messages have been sent, we allow to send data once again.
d.inflightMessages.Unlock()

// If some messages must be retried, do so
Expand Down Expand Up @@ -394,12 +405,16 @@ func (d *Device) UnsetProperty(interfaceName, path string) error {
// The main publishing loop: retrieves messages from the publishing channel and sends them one at a time, in order
func (d *Device) sendLoop() {
for {
d.inflightMessages.Lock()
d.publishMessage(<-d.inflightMessages.queue)
d.inflightMessages.Unlock()
d.publishInflightMessage()
}
}

func (d *Device) publishInflightMessage() {
d.inflightMessages.Lock()
defer d.inflightMessages.Unlock()
d.publishMessage(<-d.inflightMessages.queue)
}

func (d *Device) hasAlreadySentPropertyValue(interfaceName, path string, newValue interface{}) bool {
// the path has already been validated
oldValue, err := d.GetProperty(interfaceName, path)
Expand Down

0 comments on commit e664666

Please sign in to comment.