Skip to content

Commit

Permalink
migrate send request
Browse files Browse the repository at this point in the history
  • Loading branch information
CrowdHailer committed Jan 20, 2022
1 parent f9d631d commit f2488ea
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
23 changes: 15 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,15 @@ func (c *Client) Connect(ctx context.Context) (err error) {
return err
}

s, err := c.CreateSession(c.cfg.session)
s, err := c.CreateSessionWithContext(ctx, c.cfg.session)
if err != nil {
c.Close()
stats.RecordError(err)

return err
}

if err := c.ActivateSession(s); err != nil {
if err := c.ActivateSessionWithContext(ctx, s); err != nil {
c.Close()
stats.RecordError(err)

Expand Down Expand Up @@ -381,7 +381,7 @@ func (c *Client) monitor(ctx context.Context) {
}

dlog.Printf("trying to restore session")
if err := c.ActivateSession(s); err != nil {
if err := c.ActivateSessionWithContext(ctx, s); err != nil {
dlog.Printf("restore session failed")
action = recreateSession
continue
Expand All @@ -395,13 +395,13 @@ func (c *Client) monitor(ctx context.Context) {
// create a new session to replace the previous one

dlog.Printf("trying to recreate session")
s, err := c.CreateSession(c.cfg.session)
s, err := c.CreateSessionWithContext(ctx, c.cfg.session)
if err != nil {
dlog.Printf("recreate session failed: %v", err)
action = createSecureChannel
continue
}
if err := c.ActivateSession(s); err != nil {
if err := c.ActivateSessionWithContext(ctx, s); err != nil {
dlog.Printf("reactivate session failed: %v", err)
action = createSecureChannel
continue
Expand Down Expand Up @@ -458,7 +458,7 @@ func (c *Client) monitor(ctx context.Context) {
// populated in the previous step.

for _, id := range subsToRepublish {
if err := c.republishSubscription(id, availableSeqs[id]); err != nil {
if err := c.republishSubscription(ctx, id, availableSeqs[id]); err != nil {
dlog.Printf("republish of subscription %d failed", id)
subsToRecreate = append(subsToRecreate, id)
}
Expand Down Expand Up @@ -644,6 +644,9 @@ type Session struct {
//
// See Part 4, 5.6.2
func (c *Client) CreateSession(cfg *uasc.SessionConfig) (*Session, error) {
return c.CreateSessionWithContext(context.Background(), cfg)
}
func (c *Client) CreateSessionWithContext(ctx context.Context, cfg *uasc.SessionConfig) (*Session, error) {
if c.SecureChannel() == nil {
return nil, ua.StatusBadServerNotConnected
}
Expand All @@ -670,7 +673,7 @@ func (c *Client) CreateSession(cfg *uasc.SessionConfig) (*Session, error) {
var s *Session
// for the CreateSessionRequest the authToken is always nil.
// use c.SecureChannel().SendRequest() to enforce this.
err := c.SecureChannel().SendRequest(req, nil, func(v interface{}) error {
err := c.SecureChannel().SendRequestWithContext(ctx, req, nil, func(v interface{}) error {
var res *ua.CreateSessionResponse
if err := safeAssign(v, &res); err != nil {
return err
Expand Down Expand Up @@ -727,7 +730,11 @@ func anonymousPolicyID(endpoints []*ua.EndpointDescription) string {
// session call DetachSession.
//
// See Part 4, 5.6.3

func (c *Client) ActivateSession(s *Session) error {
return c.ActivateSessionWithContext(context.Background(), s)
}
func (c *Client) ActivateSessionWithContext(ctx context.Context, s *Session) error {
if c.SecureChannel() == nil {
return ua.StatusBadServerNotConnected
}
Expand Down Expand Up @@ -776,7 +783,7 @@ func (c *Client) ActivateSession(s *Session) error {
UserIdentityToken: ua.NewExtensionObject(s.cfg.UserIdentityToken),
UserTokenSignature: s.cfg.UserTokenSignature,
}
return c.SecureChannel().SendRequest(req, s.resp.AuthenticationToken, func(v interface{}) error {
return c.SecureChannel().SendRequestWithContext(ctx, req, s.resp.AuthenticationToken, func(v interface{}) error {
var res *ua.ActivateSessionResponse
if err := safeAssign(v, &res); err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Client) transferSubscriptions(ids []uint32) (*ua.TransferSubscriptionsR
}

// republishSubscriptions sends republish requests for the given subscription id.
func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
func (c *Client) republishSubscription(ctx context.Context, id uint32, availableSeq []uint32) error {
c.subMux.RLock()
defer c.subMux.RUnlock()

Expand All @@ -123,7 +123,7 @@ func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
}

debug.Printf("republishing subscription %d", sub.SubscriptionID)
if err := c.sendRepublishRequests(sub, availableSeq); err != nil {
if err := c.sendRepublishRequests(ctx, sub, availableSeq); err != nil {
status, ok := err.(ua.StatusCode)
if !ok {
return err
Expand All @@ -144,7 +144,7 @@ func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
// sendRepublishRequests sends republish requests for the given subscription
// until it gets a BadMessageNotAvailable which implies that there are no
// more messages to restore.
func (c *Client) sendRepublishRequests(sub *Subscription, availableSeq []uint32) error {
func (c *Client) sendRepublishRequests(ctx context.Context, sub *Subscription, availableSeq []uint32) error {
// todo(fs): check if sub.nextSeq is in the available sequence numbers
// todo(fs): if not then we need to decide whether we fail b/c of data loss
// todo(fs): or whether we log it and continue.
Expand All @@ -170,7 +170,7 @@ func (c *Client) sendRepublishRequests(sub *Subscription, availableSeq []uint32)

debug.Printf("RepublishRequest: req=%s", debug.ToJSON(req))
var res *ua.RepublishResponse
err := c.SecureChannel().SendRequest(req, c.Session().resp.AuthenticationToken, func(v interface{}) error {
err := c.SecureChannel().SendRequestWithContext(ctx, req, c.Session().resp.AuthenticationToken, func(v interface{}) error {
return safeAssign(v, &res)
})
debug.Printf("RepublishResponse: res=%s err=%v", debug.ToJSON(res), err)
Expand Down
6 changes: 5 additions & 1 deletion uasc/secure_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,10 @@ func (s *SecureChannel) SendRequest(req ua.Request, authToken *ua.NodeID, h func
return s.SendRequestWithTimeout(req, authToken, s.cfg.RequestTimeout, h)
}

func (s *SecureChannel) SendRequestWithContext(ctx context.Context, req ua.Request, authToken *ua.NodeID, h func(interface{}) error) error {
return s.SendRequestWithTimeoutWithContext(ctx, req, authToken, s.cfg.RequestTimeout, h)
}

func (s *SecureChannel) SendRequestWithTimeout(req ua.Request, authToken *ua.NodeID, timeout time.Duration, h func(interface{}) error) error {
return s.SendRequestWithTimeoutWithContext(context.Background(), req, authToken, timeout, h)
}
Expand Down Expand Up @@ -848,7 +852,7 @@ func (s *SecureChannel) close() error {
default:
}

err := s.SendRequest(&ua.CloseSecureChannelRequest{}, nil, nil)
err := s.SendRequestWithContext(context.Background(), &ua.CloseSecureChannelRequest{}, nil, nil)
if err != nil {
return err
}
Expand Down

0 comments on commit f2488ea

Please sign in to comment.