Skip to content

Commit

Permalink
Client side chunking
Browse files Browse the repository at this point in the history
  • Loading branch information
diericd authored and magiconair committed Nov 8, 2021
1 parent 6e89091 commit eb4a59b
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 31 deletions.
81 changes: 66 additions & 15 deletions uasc/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package uasc

import (
"math"

"github.com/gopcua/opcua/errors"
"github.com/gopcua/opcua/ua"
)
Expand Down Expand Up @@ -111,25 +113,74 @@ func (m *Message) Decode(b []byte) (int, error) {
}

func (m *Message) Encode() ([]byte, error) {
body := ua.NewBuffer(nil)
chunks, err := m.EncodeChunks(math.MaxUint32)
if err != nil {
return nil, err
}
return chunks[0], nil
}

func (m *Message) EncodeChunks(maxBodySize uint32) ([][]byte, error) {
dataBody := ua.NewBuffer(nil)
dataBody.WriteStruct(m.TypeID)
dataBody.WriteStruct(m.Service)

if dataBody.Error() != nil {
return nil, dataBody.Error()
}

nrChunks := uint32(dataBody.Len())/(maxBodySize) + 1
chunks := make([][]byte, nrChunks)

switch m.Header.MessageType {
case "OPN":
body.WriteStruct(m.AsymmetricSecurityHeader)
partialHeader := ua.NewBuffer(nil)
partialHeader.WriteStruct(m.AsymmetricSecurityHeader)
partialHeader.WriteStruct(m.SequenceHeader)

if partialHeader.Error() != nil {
return nil, partialHeader.Error()
}

m.Header.MessageSize = uint32(12 + partialHeader.Len() + dataBody.Len())
buf := ua.NewBuffer(nil)
buf.WriteStruct(m.Header)
buf.Write(partialHeader.Bytes())
buf.Write(dataBody.Bytes())

return [][]byte{buf.Bytes()}, buf.Error()

case "CLO", "MSG":
body.WriteStruct(m.SymmetricSecurityHeader)

for i := uint32(0); i < nrChunks-1; i++ {
m.Header.MessageSize = maxBodySize + 24
m.Header.ChunkType = ChunkTypeIntermediate
chunk := ua.NewBuffer(nil)
chunk.WriteStruct(m.Header)
chunk.WriteStruct(m.SymmetricSecurityHeader)
chunk.WriteStruct(m.SequenceHeader)
chunk.Write(dataBody.ReadN(int(maxBodySize)))
if chunk.Error() != nil {
return nil, chunk.Error()
}

chunks[i] = chunk.Bytes()
}

m.Header.ChunkType = ChunkTypeFinal
m.Header.MessageSize = uint32(24 + dataBody.Len())
chunk := ua.NewBuffer(nil)
chunk.WriteStruct(m.Header)
chunk.WriteStruct(m.SymmetricSecurityHeader)
chunk.WriteStruct(m.SequenceHeader)
chunk.Write(dataBody.Bytes())
if chunk.Error() != nil {
return nil, chunk.Error()
}

chunks[nrChunks-1] = chunk.Bytes()
return chunks, nil
default:
return nil, errors.Errorf("invalid message type %q", m.Header.MessageType)
}
body.WriteStruct(m.SequenceHeader)
body.WriteStruct(m.TypeID)
body.WriteStruct(m.Service)
if body.Error() != nil {
return nil, body.Error()
}

m.Header.MessageSize = uint32(12 + body.Len())
buf := ua.NewBuffer(nil)
buf.WriteStruct(m.Header)
buf.Write(body.Bytes())
return buf.Bytes(), buf.Error()
}
43 changes: 27 additions & 16 deletions uasc/secure_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"crypto/rsa"
"crypto/x509"
"encoding/binary"
"io"
"math"
"sync"
Expand Down Expand Up @@ -526,6 +527,7 @@ func (s *SecureChannel) open(ctx context.Context, instance *channelInstance, req
reqID := s.nextRequestID()

s.openingInstance.algo = algo
s.openingInstance.SetMaximumBodySize(int(s.c.SendBufSize()))

localNonce, err := algo.MakeNonce()
if err != nil {
Expand Down Expand Up @@ -565,6 +567,8 @@ func (s *SecureChannel) handleOpenSecureChannelResponse(resp *ua.OpenSecureChann
return err
}

instance.SetMaximumBodySize(int(s.c.SendBufSize()))

s.instancesMu.Lock()
defer s.instancesMu.Unlock()

Expand Down Expand Up @@ -754,16 +758,6 @@ func (s *SecureChannel) sendAsyncWithTimeout(
return nil, err
}

b, err := m.Encode()
if err != nil {
return nil, err
}

b, err = instance.signAndEncrypt(m, b)
if err != nil {
return nil, err
}

var resp chan *response

if respRequired {
Expand All @@ -781,16 +775,33 @@ func (s *SecureChannel) sendAsyncWithTimeout(
s.handlersMu.Unlock()
}

// send the message
var n int
if n, err = s.c.Write(b); err != nil {
chunks, err := m.EncodeChunks(instance.maxBodySize)
if err != nil {
return nil, err
}

atomic.AddUint64(&instance.bytesSent, uint64(n))
atomic.AddUint32(&instance.messagesSent, 1)
for i, chunk := range chunks {
if i > 0 { // fix sequence number on subsequent chunks
number := instance.nextSequenceNumber()
binary.LittleEndian.PutUint32(chunk[16:], uint32(number))
}

chunk, err = instance.signAndEncrypt(m, chunk)
if err != nil {
return nil, err
}

debug.Printf("uasc %d/%d: send %T with %d bytes", s.c.ID(), reqID, req, len(b))
// send the message
var n int
if n, err = s.c.Write(chunk); err != nil {
return nil, err
}

atomic.AddUint64(&instance.bytesSent, uint64(n))
atomic.AddUint32(&instance.messagesSent, 1)

debug.Printf("uasc %d/%d: send %T with %d bytes", s.c.ID(), reqID, req, len(chunk))
}

return resp, nil
}
Expand Down
18 changes: 18 additions & 0 deletions uasc/secure_channel_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type channelInstance struct {
securityTokenID uint32
sequenceNumber uint32
algo *uapolicy.EncryptionAlgorithm
maxBodySize uint32

messagesSent uint32
// messagesReceived uint32
Expand Down Expand Up @@ -130,6 +131,23 @@ func (c *channelInstance) newMessage(srv interface{}, typeID uint16, requestID u
}
}

func (c *channelInstance) SetMaximumBodySize(chunkSize int) {
sequenceHeaderSize := 8
headerSize := 12
symmetricAlgorithmHeader := 4

// this is the formula proposed by OPCUA - source node-opcua
maxBodySize :=
c.algo.PlaintextBlockSize()*
((chunkSize-headerSize-symmetricAlgorithmHeader-c.algo.SignatureLength()-1)/c.algo.BlockSize()) -
sequenceHeaderSize
c.maxBodySize = uint32(maxBodySize)

// this is the formula proposed by ERN - source node-opcua
// maxBlock := (chunkSize - headerSize) / c.algo.BlockSize()
// c.maxBodySize = c.algo.PlaintextBlockSize()*maxBlock - sequenceHeaderSize - c.algo.SignatureLength() - 1
}

// signAndEncrypt encrypts the message bytes stored in b and returns the
// data signed and encrypted per the security policy information from the
// secure channel.
Expand Down

0 comments on commit eb4a59b

Please sign in to comment.