Skip to content

Commit

Permalink
rework secure channel logic
Browse files Browse the repository at this point in the history
  • Loading branch information
kung-foo committed Aug 25, 2020
1 parent 6a25c84 commit 9059bb3
Show file tree
Hide file tree
Showing 8 changed files with 843 additions and 774 deletions.
79 changes: 9 additions & 70 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"crypto/rand"
"fmt"
"io"
"log"
"reflect"
"sort"
Expand Down Expand Up @@ -106,9 +105,6 @@ type Client struct {
subscriptions map[uint32]*Subscription
subMux sync.RWMutex

//cancelMonitor cancels the monitorChannel goroutine
cancelMonitor context.CancelFunc

// once initializes session
once sync.Once
}
Expand Down Expand Up @@ -168,71 +164,33 @@ func (c *Client) Dial(ctx context.Context) error {
if c.sechan != nil {
return errors.Errorf("secure channel already connected")
}

var err error
c.conn, err = uacp.Dial(ctx, c.endpointURL)
if err != nil {
return err
}

c.sechan, err = uasc.NewSecureChannel(c.endpointURL, c.conn, c.cfg)
if err != nil {
_ = c.conn.Close()
return err
}

// Issue #313: decouple the dial context from the monitor context
// mctx must *not* be a child context of 'ctx'. Otherwise, the
// monitor go routine terminates whenever the dial context is done
// which may get triggered unexpectedly by a timer context.
var mctx context.Context
mctx, c.cancelMonitor = context.WithCancel(context.Background())
go c.monitorChannel(mctx)
return c.openSecureChannel(mctx, c.sechan.Open)
}

func (c *Client) openSecureChannel(ctx context.Context, open func() error) error {
if err := open(); err != nil {
c.cancelMonitor()
_ = c.conn.Close()
c.sechan = nil
return err
}
return c.scheduleRenewingToken(ctx)
}

func (c *Client) monitorChannel(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
msg := c.sechan.Receive(ctx)
if msg.Err != nil {
if msg.Err == io.EOF {
debug.Printf("Connection closed")
} else {
debug.Printf("Received error: %s", msg.Err)
}
// todo (dh): apart from the above message, we're ignoring this error because there is nothing watching it
// I'd prefer to have a way to return the error to the upper application.
return
}
debug.Printf("Received unsolicited message from server: %T", msg.V)
}
}
return c.sechan.Open(ctx)
}

// Close closes the session and the secure channel.
func (c *Client) Close() error {
if c.sechan == nil {
return ua.StatusBadServerNotConnected
}
defer c.conn.Close()

// try to close the session but ignore any error
// so that we close the underlying channel and connection.
_ = c.CloseSession()
if c.cancelMonitor != nil {
c.cancelMonitor()
}
return c.sechan.Close()

_ = c.sechan.Close()

return nil
}

var errNotConnected = errors.New("not connected")
Expand Down Expand Up @@ -757,25 +715,6 @@ func (c *Client) HistoryReadRawModified(nodes []*ua.HistoryReadValueID, details
return res, err
}

func (c *Client) scheduleRenewingToken(ctx context.Context) error {
if c.sechan == nil {
return ua.StatusBadServerNotConnected
}
timer := time.NewTimer(time.Duration(0.75*float64(c.sechan.Lifetime())) * time.Millisecond) // 0.75 is from Part 4, Section 5.5.2.1

go func() {
select {
case <-ctx.Done():
timer.Stop()
case <-timer.C:
debug.Printf("renewing security token...")
// Ignore the error. openSecureChannel will close the connection on error and the user will surely notice
_ = c.openSecureChannel(ctx, c.sechan.Renew)
}
}()
return nil
}

// safeAssign implements a type-safe assign from T to *T.
func safeAssign(t, ptrT interface{}) error {
if reflect.TypeOf(t) != reflect.TypeOf(ptrT).Elem() {
Expand Down
2 changes: 2 additions & 0 deletions uacp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ const hdrlen = 8
// The size of b must be at least ReceiveBufSize. Otherwise,
// the function returns an error.
func (c *Conn) Receive() ([]byte, error) {
// TODO(kung-foo): allow user-specified buffer
// TODO(kung-foo): sync.Pool
b := make([]byte, c.ack.ReceiveBufSize)

if _, err := io.ReadFull(c, b[:hdrlen]); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions uapolicy/securitypolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
package uapolicy

import (
"crypto/rand"
"crypto/rsa"
"io"
"sort"

"github.com/gopcua/opcua/errors"
Expand Down Expand Up @@ -145,6 +147,17 @@ func (e *EncryptionAlgorithm) NonceLength() int {
return e.nonceLength
}

func (e *EncryptionAlgorithm) MakeNonce() ([]byte, error) {
b := make([]byte, e.NonceLength())
// note: we use `rand.Reader` instead of `rand.Read(...)` to ensure that we don't accidentally switch to using
// math/rand (which has a default, fixed seed). Only crypto/rand exposes a global `io.Reader` var.
_, err := io.ReadFull(rand.Reader, b)
if err != nil {
return nil, err
}
return b, nil
}

// EncryptionURI returns the URI for the encryption algorithm as defined
// by the OPC-UA profiles in Part 7
func (e *EncryptionAlgorithm) EncryptionURI() string {
Expand Down
30 changes: 20 additions & 10 deletions uasc/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ func TestMessage(t *testing.T) {
cfg: &Config{
SecurityPolicyURI: "http://gopcua.example/OPCUA/SecurityPolicy#Foo",
},
requestID: 1,
sequenceNumber: 1,
}
instance := &channelInstance{
sc: s,
sequenceNumber: 0,
securityTokenID: 0,
}
m := s.newMessage(
m := instance.newMessage(
&ua.OpenSecureChannelRequest{
RequestHeader: &ua.RequestHeader{
AuthenticationToken: ua.NewTwoByteNodeID(0),
Expand All @@ -41,6 +43,7 @@ func TestMessage(t *testing.T) {
RequestedLifetime: 6000000,
},
id.OpenSecureChannelRequest_Encoding_DefaultBinary,
s.nextRequestID(),
)

// set message size manually, since it is computed in Encode
Expand Down Expand Up @@ -110,18 +113,21 @@ func TestMessage(t *testing.T) {
// RequestedLifetime
0x80, 0x8d, 0x5b, 0x00,
},
}, {
},
{
Name: "MSG",
Struct: func() interface{} {
s := &SecureChannel{
cfg: &Config{
SecurityPolicyURI: "http://gopcua.example/OPCUA/SecurityPolicy#Foo",
},
requestID: 1,
sequenceNumber: 1,
}
instance := &channelInstance{
sc: s,
sequenceNumber: 0,
securityTokenID: 0,
}
m := s.newMessage(
m := instance.newMessage(
&ua.GetEndpointsRequest{
RequestHeader: &ua.RequestHeader{
AuthenticationToken: ua.NewTwoByteNodeID(0),
Expand All @@ -133,6 +139,7 @@ func TestMessage(t *testing.T) {
EndpointURL: "opc.tcp://wow.its.easy:11111/UA/Server",
},
id.GetEndpointsRequest_Encoding_DefaultBinary,
s.nextRequestID(),
)

// set message size manually, since it is computed in Encode
Expand Down Expand Up @@ -185,11 +192,13 @@ func TestMessage(t *testing.T) {
cfg: &Config{
SecurityPolicyURI: "http://gopcua.example/OPCUA/SecurityPolicy#Foo",
},
requestID: 1,
sequenceNumber: 1,
}
instance := &channelInstance{
sc: s,
sequenceNumber: 0,
securityTokenID: 0,
}
m := s.newMessage(
m := instance.newMessage(
&ua.CloseSecureChannelRequest{
RequestHeader: &ua.RequestHeader{
AuthenticationToken: ua.NewTwoByteNodeID(0),
Expand All @@ -200,6 +209,7 @@ func TestMessage(t *testing.T) {
},
},
id.CloseSecureChannelRequest_Encoding_DefaultBinary,
s.nextRequestID(),
)

// set message size manually, since it is computed in Encode
Expand Down
Loading

0 comments on commit 9059bb3

Please sign in to comment.