Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kung-foo committed Jul 30, 2019
1 parent 7cdc8df commit 3697230
Show file tree
Hide file tree
Showing 2 changed files with 432 additions and 0 deletions.
126 changes: 126 additions & 0 deletions examples/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"

"github.com/gopcua/opcua"
"github.com/gopcua/opcua/debug"
"github.com/gopcua/opcua/monitor"
"github.com/gopcua/opcua/ua"
)

func main() {
var (
endpoint = flag.String("endpoint", "opc.tcp://localhost:4840", "OPC UA Endpoint URL")
policy = flag.String("policy", "", "Security policy: None, Basic128Rsa15, Basic256, Basic256Sha256. Default: auto")
mode = flag.String("mode", "", "Security mode: None, Sign, SignAndEncrypt. Default: auto")
certFile = flag.String("cert", "", "Path to cert.pem. Required for security mode/policy != None")
keyFile = flag.String("key", "", "Path to private key.pem. Required for security mode/policy != None")
nodeID = flag.String("node", "", "node id to subscribe to")
)
flag.BoolVar(&debug.Enable, "debug", false, "enable debug logging")
flag.Parse()

// log.SetFlags(0)

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
<-signalCh
cancel()
}()

endpoints, err := opcua.GetEndpoints(*endpoint)
if err != nil {
log.Fatal(err)
}

ep := opcua.SelectEndpoint(endpoints, *policy, ua.MessageSecurityModeFromString(*mode))
if ep == nil {
log.Fatal("Failed to find suitable endpoint")
}

fmt.Println("*", ep.SecurityPolicyURI, ep.SecurityMode)

opts := []opcua.Option{
opcua.SecurityPolicy(*policy),
opcua.SecurityModeString(*mode),
opcua.CertificateFile(*certFile),
opcua.PrivateKeyFile(*keyFile),
opcua.AuthAnonymous(),
opcua.SecurityFromEndpoint(ep, ua.UserTokenTypeAnonymous),
}

c := opcua.NewClient(ep.EndpointURL, opts...)
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}

defer c.Close()

m, err := monitor.New(c)
if err != nil {
log.Fatal(err)
}

m.SetErrorHandler(func(_ *opcua.Client, sub *monitor.Subscription, err error) {
log.Printf(err.Error())
})

// start callback-based subscription
go func() {
sub, err := m.Subscribe(ctx, func(n *ua.NodeID, v *ua.DataValue) {
log.Printf("callback: node=%s value=%v", n, v.Value.Value())
}, *nodeID)

if err != nil {
log.Fatal(err)
}

defer sub.Unsubscribe()
defer printStats(sub)

<-ctx.Done()
}()

// start channel-based subscription
go func() {
ch := make(chan *monitor.DataChangeMessage, 16)
sub, err := m.ChanSubscribe(ctx, ch, *nodeID)

if err != nil {
log.Fatal(err)
}

defer sub.Unsubscribe()
defer printStats(sub)

for {
select {
case <-ctx.Done():
return
case msg := <-ch:
if msg.Error != nil {
log.Printf(msg.Error.Error())
continue
}
log.Printf("chan: node=%s value=%v", msg.NodeID, msg.Value.Value())
}
}
}()

<-ctx.Done()
}

func printStats(sub *monitor.Subscription) {
log.Printf("stats: delivered=%d dropped=%d", sub.Delivered(), sub.Dropped())
}
Loading

0 comments on commit 3697230

Please sign in to comment.