Skip to content

Commit

Permalink
sub: limit the usage of the subMux
Browse files Browse the repository at this point in the history
Only use the subMux to maintain the internal data structures but not the
notification of the subscription itself.

Fixes #518
  • Loading branch information
magiconair committed Dec 4, 2021
1 parent 0b01054 commit c2ba69c
Showing 1 changed file with 27 additions and 30 deletions.
57 changes: 27 additions & 30 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ func (c *Client) registerSubscription(sub *Subscription) error {

func (c *Client) forgetSubscription(id uint32) {
c.subMux.Lock()
defer c.subMux.Unlock()
delete(c.subs, id)
c.subMux.Unlock()

c.updatePublishTimeout()
if len(c.subs) == 0 {
c.pauseSubscriptions()
Expand Down Expand Up @@ -248,14 +249,7 @@ func (c *Client) notifySubscriptionsOfError(ctx context.Context, subID uint32, e
}
}

func (c *Client) notifySubscription(ctx context.Context, subID uint32, notif *ua.NotificationMessage) {
// we need to hold the subMux lock already
sub, ok := c.subs[subID]
if !ok {
debug.Printf("Unknown subscription: %v", subID)
return
}

func (c *Client) notifySubscription(ctx context.Context, sub *Subscription, notif *ua.NotificationMessage) {
// todo(fs): response.Results contains the status codes of which messages were
// todo(fs): were successfully removed from the transmission queue on the server.
// todo(fs): The client sent the list of ids in the *previous* PublishRequest.
Expand All @@ -265,7 +259,7 @@ func (c *Client) notifySubscription(ctx context.Context, subID uint32, notif *ua

if notif == nil {
sub.notify(ctx, &PublishNotificationData{
SubscriptionID: subID,
SubscriptionID: sub.SubscriptionID,
Error: errors.Errorf("empty NotificationMessage"),
})
return
Expand All @@ -276,7 +270,7 @@ func (c *Client) notifySubscription(ctx context.Context, subID uint32, notif *ua
// Part 4, 7.20 NotificationData parameters
if data == nil || data.Value == nil {
sub.notify(ctx, &PublishNotificationData{
SubscriptionID: subID,
SubscriptionID: sub.SubscriptionID,
Error: errors.Errorf("missing NotificationData parameter"),
})
continue
Expand All @@ -290,14 +284,14 @@ func (c *Client) notifySubscription(ctx context.Context, subID uint32, notif *ua
*ua.EventNotificationList,
*ua.StatusChangeNotification:
sub.notify(ctx, &PublishNotificationData{
SubscriptionID: subID,
SubscriptionID: sub.SubscriptionID,
Value: data.Value,
})

// Error
default:
sub.notify(ctx, &PublishNotificationData{
SubscriptionID: subID,
SubscriptionID: sub.SubscriptionID,
Error: errors.Errorf("unknown NotificationData parameter: %T", data.Value),
})
}
Expand Down Expand Up @@ -423,9 +417,22 @@ func (c *Client) publish(ctx context.Context) error {

default:
c.subMux.Lock()
// handle pending acks for all subscriptions
c.handleAcks(res.Results)
c.handleNotification(ctx, res)

sub, ok := c.subs[res.SubscriptionID]
if !ok {
// todo(fs): should we return an error here?
dlog.Printf("error: unknown subscription %d", res.SubscriptionID)
return nil
}

// handle the publish response for a specific subscription
c.handleNotification(ctx, sub, res)
c.subMux.Unlock()

c.notifySubscription(ctx, sub, res.NotificationMessage)
dlog.Printf("notif: %d", res.NotificationMessage.SequenceNumber)
}

return nil
Expand Down Expand Up @@ -464,36 +471,26 @@ func (c *Client) handleAcks(res []ua.StatusCode) {
dlog.Printf("notAcked=%v", notAcked)
}

func (c *Client) handleNotification(ctx context.Context, res *ua.PublishResponse) {
func (c *Client) handleNotification(ctx context.Context, sub *Subscription, res *ua.PublishResponse) {
dlog := debug.NewPrefixLogger("publish: sub %d: ", res.SubscriptionID)

s := c.subs[res.SubscriptionID]
if s == nil {
// todo(fs): we probably want to do something here
dlog.Printf("error: unknown subscription")
return
}

// keep-alive message
if len(res.NotificationMessage.NotificationData) == 0 {
// todo(fs): do we care about the next sequence number?
s.nextSeq = res.NotificationMessage.SequenceNumber
sub.nextSeq = res.NotificationMessage.SequenceNumber
return
}

if res.NotificationMessage.SequenceNumber != s.nextSeq {
dlog.Printf("error: got notif %d but was expecting notif %d. Data loss?", res.NotificationMessage.SequenceNumber, s.nextSeq)
if res.NotificationMessage.SequenceNumber != sub.nextSeq {
dlog.Printf("error: got notif %d but was expecting notif %d. Data loss?", res.NotificationMessage.SequenceNumber, sub.nextSeq)
}

s.lastSeq = res.NotificationMessage.SequenceNumber
s.nextSeq = s.lastSeq + 1
sub.lastSeq = res.NotificationMessage.SequenceNumber
sub.nextSeq = sub.lastSeq + 1
c.pendingAcks = append(c.pendingAcks, &ua.SubscriptionAcknowledgement{
SubscriptionID: res.SubscriptionID,
SequenceNumber: res.NotificationMessage.SequenceNumber,
})

s.c.notifySubscription(ctx, res.SubscriptionID, res.NotificationMessage)
dlog.Printf("notif: %d", res.NotificationMessage.SequenceNumber)
}

func (c *Client) sendPublishRequest() (*ua.PublishResponse, error) {
Expand Down

0 comments on commit c2ba69c

Please sign in to comment.