Skip to content

Commit

Permalink
Publish (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
wsnotify authored and magiconair committed Apr 30, 2019
1 parent 5f710d7 commit 30ab0b8
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 0 deletions.
98 changes: 98 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,101 @@ func (c *Client) Subscribe(intv time.Duration) (*Subscription, error) {
})
return &Subscription{res}, err
}

type PublishNotificationData struct {
SubscriptionID uint32
Error error
Value interface{}
}

func (c *Client) Publish(notif chan<- PublishNotificationData) {
// Empty SubscriptionAcknowledgements for first PublishRequest
var acks = make([]*ua.SubscriptionAcknowledgement, 0)

for {
req := &ua.PublishRequest{
SubscriptionAcknowledgements: acks,
}

var res *ua.PublishResponse
err := c.Send(req, func(v interface{}) error {
r, ok := v.(*ua.PublishResponse)
if !ok {
return fmt.Errorf("invalid response: %T", v)
}
res = r
return nil
})
if err != nil {
notif <- PublishNotificationData{Error: err}
continue
}

// Check for errors
status := ua.StatusOK
for _, res := range res.Results {
if res != ua.StatusOK {
status = res
break
}
}

if status != ua.StatusOK {
notif <- PublishNotificationData{
SubscriptionID: res.SubscriptionID,
Error: status,
}
continue
}

// Prepare SubscriptionAcknowledgement for next PublishRequest
acks = make([]*ua.SubscriptionAcknowledgement, 0)
for _, i := range res.AvailableSequenceNumbers {
ack := &ua.SubscriptionAcknowledgement{
SubscriptionID: res.SubscriptionID,
SequenceNumber: i,
}
acks = append(acks, ack)
}

if res.NotificationMessage == nil {
notif <- PublishNotificationData{
SubscriptionID: res.SubscriptionID,
Error: fmt.Errorf("empty NotificationMessage"),
}
continue
}

// Part 4, 7.21 NotificationMessage
for _, data := range res.NotificationMessage.NotificationData {
// Part 4, 7.20 NotificationData parameters
if data == nil || data.Value == nil {
notif <- PublishNotificationData{
SubscriptionID: res.SubscriptionID,
Error: fmt.Errorf("missing NotificationData parameter"),
}
continue
}

switch data.Value.(type) {
// Part 4, 7.20.2 DataChangeNotification parameter
// Part 4, 7.20.3 EventNotificationList parameter
// Part 4, 7.20.4 StatusChangeNotification parameter
case *ua.DataChangeNotification,
*ua.EventNotificationList,
*ua.StatusChangeNotification:
notif <- PublishNotificationData{
SubscriptionID: res.SubscriptionID,
Value: data.Value,
}

// Error
default:
notif <- PublishNotificationData{
SubscriptionID: res.SubscriptionID,
Error: fmt.Errorf("unknown NotificationData parameter: %T", data.Value),
}
}
}
}
}
48 changes: 48 additions & 0 deletions examples/publish/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2018-2019 opcua authors. All rights reserved.
// Use of this source code is governed by a MIT-style license that can be
// found in the LICENSE file.

package main

import (
"flag"
"log"

"github.com/gopcua/opcua"
"github.com/gopcua/opcua/debug"
"github.com/gopcua/opcua/ua"
)

func main() {
endpoint := flag.String("endpoint", "opc.tcp://localhost:4840", "OPC UA Endpoint URL")
flag.BoolVar(&debug.Enable, "debug", false, "enable debug logging")
flag.Parse()
log.SetFlags(0)

c := opcua.NewClient(*endpoint)
if err := c.Connect(); err != nil {
log.Fatal(err)
}
defer c.Close()

ch := make(chan opcua.PublishNotificationData)
go c.Publish(ch)

for {
res := <-ch
if res.Error != nil {
log.Print(res.Error)
continue
}

switch x := res.Value.(type) {
case *ua.DataChangeNotification:
for _, item := range x.MonitoredItems {
data, ok := item.Value.Value.Value.(float64)
if ok {
log.Printf("%g", data)
}
}
}
}
}

0 comments on commit 30ab0b8

Please sign in to comment.