Skip to content

Commit

Permalink
chore!: change Centrifugo subscription contract
Browse files Browse the repository at this point in the history
  • Loading branch information
cailloumajor committed Feb 28, 2022
1 parent a8208b8 commit f785280
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 97 deletions.
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ A microservice to proxy OPC-UA data change subscription through Centrifugo.
[1]: https://centrifugal.dev/docs/server/proxy#subscribe-proxy
[2]: https://centrifugal.dev/docs/server/channels#channel-namespaces

- A Centrifugo server is configured to [proxy subscriptions][1] to this service.
- Clients must subscribe to Centrifugo channels with following characteristics:
- [Namespace][2]: `opcua`
- Channel name: semicolon-separated fields (e.g. `s=MyNode;30000`), as following, in the same order:
- string notation of the OPC-UA NodeID identifier type and identifier
- publishing interval of OPC-UA notification messages (integer, in milliseconds)
- A Centrifugo server (at least v3.1.1) is configured to [proxy subscriptions][1] to this service.
- Clients interested in OPC-UA values changes subscribe to Centrifugo with following request fields:
- *Channel*: `opcua:` [namespace][2], followed by the requested publishing interval in milliseconds.
- *Data*: a JSON array of all individual nodes to monitor (structures and arrays must be flattened).

## Data flow

Expand Down
48 changes: 48 additions & 0 deletions internal/opcua/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package opcua

import (
"fmt"
"strconv"
"strings"
"time"
)

const channelPrefix = "opcua:"

type sentinelError string

func (e sentinelError) Error() string {
return string(e)
}

// ErrNotOpcUaChannel is issued when the channel is not suitable for OPC-UA.
const ErrNotOpcUaChannel = sentinelError("not an OPC-UA suitable channel")

// PublishingInterval represents a publishing interval.
type PublishingInterval time.Duration

// ParseChannel parses a Centrifugo channel into a publishing interval.
//
// See "Specifications" section in README.md for the format of the channel.
func ParseChannel(s string) (PublishingInterval, error) {
if !strings.HasPrefix(s, channelPrefix) {
return 0, ErrNotOpcUaChannel
}

cn := strings.TrimPrefix(s, channelPrefix)

ms, err := strconv.ParseUint(cn, 10, 64)
switch {
case err != nil:
return 0, fmt.Errorf("error parsing interval: %w", err)
case ms > uint64(time.Duration(1<<63-1).Milliseconds()):
return 0, fmt.Errorf("interval too big: %d", ms)
}

return PublishingInterval(time.Duration(ms) * time.Millisecond), nil
}

// Channel returns the Centrifugo channel name for this monitored node.
func (p PublishingInterval) Channel() string {
return fmt.Sprint(channelPrefix, time.Duration(p).Milliseconds())
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,19 @@ package opcua_test
import (
"errors"
"testing"
"time"

. "github.com/cailloumajor/opcua-centrifugo/internal/opcua"
"github.com/cailloumajor/opcua-centrifugo/internal/testutils"
)

func TestParseChannelSuccess(t *testing.T) {
const channel = `opcua:s="node1"."node2";1800000`
const channel = `opcua:1800000`

c, err := ParseChannel(channel)

if msg := testutils.AssertError(t, err, false); msg != "" {
t.Errorf("ParseChannel(): %s", msg)
}
if got, want := c.Node, `s="node1"."node2"`; got != want {
t.Errorf("Node member: want %q, got %q", want, got)
}
if got, want := c.Interval, 30*time.Minute; got != want {
t.Errorf("Interval member: want %v, got %v", want, got)
}
if got, want := c.Channel(), channel; got != want {
t.Errorf("Channel() method: want %q, got %q", want, got)
}
Expand All @@ -36,37 +29,27 @@ func TestParseChannelError(t *testing.T) {
}{
{
name: "NoNamespace",
input: `s="node1"."node2"`,
input: `30000`,
expectNotOpcUaChannel: true,
},
{
name: "NotOpcUaNamespace",
input: `ns:s="node1"."node2"`,
input: `ns:30000`,
expectNotOpcUaChannel: true,
},
{
name: "MissingInterval",
input: `opcua:s="node1"."node2"`,
expectNotOpcUaChannel: false,
},
{
name: "TooManySemicolons",
input: `opcua:ns=2;s="node1"."node2";1800000`,
expectNotOpcUaChannel: false,
},
{
name: "IntervalParsingError",
input: `opcua:s="node1"."node2";interval`,
input: `opcua:interval`,
expectNotOpcUaChannel: false,
},
{
name: "NegativeInterval",
input: `opcua:s="node1"."node2";-5000`,
input: `opcua:-5000`,
expectNotOpcUaChannel: false,
},
{
name: "IntervalTooBig",
input: `opcua:s="node1"."node2";9223372036855`,
input: `opcua:9223372036855`,
expectNotOpcUaChannel: false,
},
}
Expand Down
4 changes: 2 additions & 2 deletions internal/opcua/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ type Monitor struct {
client ClientProvider

mu sync.Mutex
subs map[time.Duration]Subscription
subs map[PublishingInterval]Subscription
}

// NewMonitor creates an OPC-UA node monitor.
func NewMonitor(ctx context.Context, cfg *Config, c ClientProvider) *Monitor {
return &Monitor{
client: c,
subs: make(map[time.Duration]Subscription),
subs: make(map[PublishingInterval]Subscription),
}
}

Expand Down
4 changes: 1 addition & 3 deletions internal/opcua/monitor_stubs_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package opcua

import "time"

func (m *Monitor) AddSubscription(interval time.Duration, sub Subscription) {
func (m *Monitor) AddSubscription(interval PublishingInterval, sub Subscription) {
m.subs[interval] = sub
}
2 changes: 1 addition & 1 deletion internal/opcua/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestMonitorStop(t *testing.T) {
},
}
mockedSubscriptions[i] = mockedSubscription
m.AddSubscription(time.Duration(i+1)*time.Second, mockedSubscription)
m.AddSubscription(PublishingInterval(time.Duration(i+1)*time.Second), mockedSubscription)
}

errs := m.Stop(context.Background())
Expand Down
62 changes: 0 additions & 62 deletions internal/opcua/monitored_node.go

This file was deleted.

0 comments on commit f785280

Please sign in to comment.