diff --git a/client.go b/client.go index 12ced856..3fad6841 100644 --- a/client.go +++ b/client.go @@ -426,3 +426,101 @@ func (c *Client) Subscribe(intv time.Duration) (*Subscription, error) { }) return &Subscription{res}, err } + +type PublishNotificationData struct { + SubscriptionID uint32 + Error error + Value interface{} +} + +func (c *Client) Publish(notif chan<- PublishNotificationData) { + // Empty SubscriptionAcknowledgements for first PublishRequest + var acks = make([]*ua.SubscriptionAcknowledgement, 0) + + for { + req := &ua.PublishRequest{ + SubscriptionAcknowledgements: acks, + } + + var res *ua.PublishResponse + err := c.Send(req, func(v interface{}) error { + r, ok := v.(*ua.PublishResponse) + if !ok { + return fmt.Errorf("invalid response: %T", v) + } + res = r + return nil + }) + if err != nil { + notif <- PublishNotificationData{Error: err} + continue + } + + // Check for errors + status := ua.StatusOK + for _, res := range res.Results { + if res != ua.StatusOK { + status = res + break + } + } + + if status != ua.StatusOK { + notif <- PublishNotificationData{ + SubscriptionID: res.SubscriptionID, + Error: status, + } + continue + } + + // Prepare SubscriptionAcknowledgement for next PublishRequest + acks = make([]*ua.SubscriptionAcknowledgement, 0) + for _, i := range res.AvailableSequenceNumbers { + ack := &ua.SubscriptionAcknowledgement{ + SubscriptionID: res.SubscriptionID, + SequenceNumber: i, + } + acks = append(acks, ack) + } + + if res.NotificationMessage == nil { + notif <- PublishNotificationData{ + SubscriptionID: res.SubscriptionID, + Error: fmt.Errorf("empty NotificationMessage"), + } + continue + } + + // Part 4, 7.21 NotificationMessage + for _, data := range res.NotificationMessage.NotificationData { + // Part 4, 7.20 NotificationData parameters + if data == nil || data.Value == nil { + notif <- PublishNotificationData{ + SubscriptionID: res.SubscriptionID, + Error: fmt.Errorf("missing NotificationData parameter"), + } + continue + } + + switch data.Value.(type) { + // Part 4, 7.20.2 DataChangeNotification parameter + // Part 4, 7.20.3 EventNotificationList parameter + // Part 4, 7.20.4 StatusChangeNotification parameter + case *ua.DataChangeNotification, + *ua.EventNotificationList, + *ua.StatusChangeNotification: + notif <- PublishNotificationData{ + SubscriptionID: res.SubscriptionID, + Value: data.Value, + } + + // Error + default: + notif <- PublishNotificationData{ + SubscriptionID: res.SubscriptionID, + Error: fmt.Errorf("unknown NotificationData parameter: %T", data.Value), + } + } + } + } +} diff --git a/examples/publish/publish.go b/examples/publish/publish.go new file mode 100644 index 00000000..f8bf9e2e --- /dev/null +++ b/examples/publish/publish.go @@ -0,0 +1,48 @@ +// Copyright 2018-2019 opcua authors. All rights reserved. +// Use of this source code is governed by a MIT-style license that can be +// found in the LICENSE file. + +package main + +import ( + "flag" + "log" + + "github.com/gopcua/opcua" + "github.com/gopcua/opcua/debug" + "github.com/gopcua/opcua/ua" +) + +func main() { + endpoint := flag.String("endpoint", "opc.tcp://localhost:4840", "OPC UA Endpoint URL") + flag.BoolVar(&debug.Enable, "debug", false, "enable debug logging") + flag.Parse() + log.SetFlags(0) + + c := opcua.NewClient(*endpoint) + if err := c.Connect(); err != nil { + log.Fatal(err) + } + defer c.Close() + + ch := make(chan opcua.PublishNotificationData) + go c.Publish(ch) + + for { + res := <-ch + if res.Error != nil { + log.Print(res.Error) + continue + } + + switch x := res.Value.(type) { + case *ua.DataChangeNotification: + for _, item := range x.MonitoredItems { + data, ok := item.Value.Value.Value.(float64) + if ok { + log.Printf("%g", data) + } + } + } + } +}