Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add context to subscription methods #549

Merged
merged 2 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (c *Client) monitor(ctx context.Context) {

// try to transfer all subscriptions to the new session and
// recreate them all if that fails.
res, err := c.transferSubscriptions(subIDs)
res, err := c.transferSubscriptions(ctx, subIDs)
switch {
case err != nil:
dlog.Printf("transfer subscriptions failed. Recreating all subscriptions: %v", err)
Expand Down
14 changes: 11 additions & 3 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ import (
// Subscribe creates a Subscription with given parameters.
// Parameters that have not been set are set to their default values.
// See opcua.DefaultSubscription* constants
//
// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan<- *PublishNotificationData) (*Subscription, error) {
return c.SubscribeWithContext(context.Background(), params, notifyCh)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (c *Client) SubscribeWithContext(ctx context.Context, params *SubscriptionParameters, notifyCh chan<- *PublishNotificationData) (*Subscription, error) {
stats.Client().Add("Subscribe", 1)

if params == nil {
Expand All @@ -34,7 +42,7 @@ func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan<- *Publ
}

var res *ua.CreateSubscriptionResponse
err := c.Send(req, func(v interface{}) error {
err := c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -99,14 +107,14 @@ func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {

// transferSubscriptions ask the server to transfer the given subscriptions
// of the previous session to the current one.
func (c *Client) transferSubscriptions(ids []uint32) (*ua.TransferSubscriptionsResponse, error) {
func (c *Client) transferSubscriptions(ctx context.Context, ids []uint32) (*ua.TransferSubscriptionsResponse, error) {
req := &ua.TransferSubscriptionsRequest{
SubscriptionIDs: ids,
SendInitialValues: false,
}

var res *ua.TransferSubscriptionsResponse
err := c.Send(req, func(v interface{}) error {
err := c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
Expand Down
49 changes: 39 additions & 10 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ type PublishNotificationData struct {
func (s *Subscription) Cancel(ctx context.Context) error {
stats.Subscription().Add("Cancel", 1)
s.c.forgetSubscription(ctx, s.SubscriptionID)
return s.delete()
return s.delete(ctx)
}

// delete removes the subscription from the server.
func (s *Subscription) delete() error {
func (s *Subscription) delete(ctx context.Context) error {
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}

var res *ua.DeleteSubscriptionsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -111,7 +111,14 @@ func (s *Subscription) delete() error {
}
}

// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
return s.MonitorWithContext(context.Background(), ts, items...)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (s *Subscription) MonitorWithContext(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
stats.Subscription().Add("Monitor", 1)
stats.Subscription().Add("MonitoredItems", int64(len(items)))

Expand All @@ -123,7 +130,7 @@ func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredI
}

var res *ua.CreateMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -146,7 +153,14 @@ func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredI
return res, err
}

// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) {
return s.UnmonitorWithContext(context.Background(), monitoredItemIDs...)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (s *Subscription) UnmonitorWithContext(ctx context.Context, monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) {
stats.Subscription().Add("Unmonitor", 1)
stats.Subscription().Add("UnmonitoredItems", int64(len(monitoredItemIDs)))

Expand All @@ -156,7 +170,7 @@ func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitore
}

var res *ua.DeleteMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -172,7 +186,14 @@ func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitore
return res, err
}

// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemModifyRequest) (*ua.ModifyMonitoredItemsResponse, error) {
return s.ModifyMonitoredItemsWithContext(context.Background(), ts, items...)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (s *Subscription) ModifyMonitoredItemsWithContext(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemModifyRequest) (*ua.ModifyMonitoredItemsResponse, error) {
stats.Subscription().Add("ModifyMonitoredItems", 1)
stats.Subscription().Add("ModifiedMonitoredItems", int64(len(items)))

Expand All @@ -191,7 +212,7 @@ func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*
ItemsToModify: items,
}
var res *ua.ModifyMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -222,7 +243,15 @@ func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*
// SetTriggering sends a request to the server to add and/or remove triggering links from a triggering item.
// To add links from a triggering item to an item to report provide the server assigned ID(s) in the `add` argument.
// To remove links from a triggering item to an item to report provide the server assigned ID(s) in the `remove` argument.
//
// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (s *Subscription) SetTriggering(triggeringItemID uint32, add, remove []uint32) (*ua.SetTriggeringResponse, error) {
return s.SetTriggeringWithContext(context.Background(), triggeringItemID, add, remove)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (s *Subscription) SetTriggeringWithContext(ctx context.Context, triggeringItemID uint32, add, remove []uint32) (*ua.SetTriggeringResponse, error) {
stats.Subscription().Add("SetTriggering", 1)

// Part 4, 5.12.5.2 SetTriggering Service Parameters
Expand All @@ -234,7 +263,7 @@ func (s *Subscription) SetTriggering(triggeringItemID uint32, add, remove []uint
}

var res *ua.SetTriggeringResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
Expand Down Expand Up @@ -319,7 +348,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(req, func(v interface{}) error {
_ = s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
Expand All @@ -336,7 +365,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
Priority: params.Priority,
}
var res *ua.CreateSubscriptionResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -379,7 +408,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
}

var res *ua.CreateMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down