Skip to content

Commit

Permalink
feat: implement opcua-centrifugo command
Browse files Browse the repository at this point in the history
  • Loading branch information
cailloumajor committed Mar 8, 2022
1 parent 5f4cdc7 commit 2500e39
Show file tree
Hide file tree
Showing 18 changed files with 755 additions and 255 deletions.
20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,17 @@ USAGE
opcua-centrifugo [options]

OPTIONS
Flag Env Var Description
-debug log debug information (default: false)
-opcua-cert-file OPCUA_CERT_FILE certificate file path for OPC-UA secure channel (optional)
-opcua-key-file OPCUA_KEY_FILE private key file path for OPC-UA secure channel (optional)
-opcua-password OPCUA_PASSWORD password for OPC-UA authentication (optional)
-opcua-server-url OPCUA_SERVER_URL OPC-UA server endpoint URL (default: opc.tcp://127.0.0.1:4840)
-opcua-user OPCUA_USER user name for OPC-UA authentication (optional)
Flag Env Var Description
-centrifugo-api-address CENTRIFUGO_API_ADDRESS Centrifugo API endpoint
-centrifugo-api-key CENTRIFUGO_API_KEY Centrifugo API key
-centrifugo-namespace CENTRIFUGO_NAMESPACE Centrifugo channel namespace for this instance
-debug log debug information (default: false)
-opcua-cert-file OPCUA_CERT_FILE certificate file path for OPC-UA secure channel (optional)
-opcua-client-timeout OPCUA_CLIENT_TIMEOUT timeout for connecting the OPC-UA client (default: 10s)
-opcua-key-file OPCUA_KEY_FILE private key file path for OPC-UA secure channel (optional)
-opcua-password OPCUA_PASSWORD password for OPC-UA authentication (optional)
-opcua-server-url OPCUA_SERVER_URL OPC-UA server endpoint URL (default: opc.tcp://127.0.0.1:4840)
-opcua-tidy-interval OPCUA_TIDY_INTERVAL interval at which to tidy-up OPC-UA subscriptions (default: 30s)
-opcua-user OPCUA_USER user name for OPC-UA authentication (optional)
-proxy-listen PROXY_LISTEN Centrifugo proxy listen address (default: :8080)
```
38 changes: 38 additions & 0 deletions cmd/opcua-centrifugo/intervals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"context"
"fmt"
"time"

"github.com/cailloumajor/opcua-centrifugo/internal/centrifugo"
"github.com/centrifugal/gocent/v3"
)

//go:generate moq -out intervals_mocks_test.go . CentrifugoChannels

// CentrifugoChannels is a consumer contract modelling a Centrifugo channels query.
type CentrifugoChannels interface {
Channels(ctx context.Context, opts ...gocent.ChannelsOption) (gocent.ChannelsResult, error)
}

// ChannelIntervals returns the intervals of the Centrifugo channels in the given namespace.
func ChannelIntervals(ctx context.Context, ch CentrifugoChannels, ns string) ([]time.Duration, error) {
pat := fmt.Sprint(ns, centrifugo.NsSeparator, "*")
res, err := ch.Channels(ctx, gocent.WithPattern(pat))
if err != nil {
return nil, fmt.Errorf("error querying channels: %w", err)
}

var durations []time.Duration

for chName := range res.Channels {
c, err := centrifugo.ParseChannel(chName, ns)
if err != nil {
continue
}
durations = append(durations, c.Interval())
}

return durations, nil
}
81 changes: 81 additions & 0 deletions cmd/opcua-centrifugo/intervals_mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions cmd/opcua-centrifugo/intervals_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main_test

import (
"context"
"reflect"
"testing"
"time"

. "github.com/cailloumajor/opcua-centrifugo/cmd/opcua-centrifugo"
"github.com/cailloumajor/opcua-centrifugo/internal/testutils"
"github.com/centrifugal/gocent/v3"
)

func TestChannelsInterval(t *testing.T) {
cases := []struct {
name string
channelsError bool
expectError bool
expectIntervals []time.Duration
}{
{
name: "ChannelsError",
channelsError: true,
expectError: true,
expectIntervals: nil,
},
{
name: "Success",
channelsError: false,
expectError: false,
expectIntervals: []time.Duration{2 * time.Second, 5 * time.Second},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mockedCentrifugoChannels := &CentrifugoChannelsMock{
ChannelsFunc: func(ctx context.Context, opts ...gocent.ChannelsOption) (gocent.ChannelsResult, error) {
options := &gocent.ChannelsOptions{}
for _, opt := range opts {
opt(options)
}
if got, want := options.Pattern, "ns:*"; got != want {
t.Errorf("Channels call Pattern option: want %q, got %q", want, got)
}
if tc.channelsError {
return gocent.ChannelsResult{}, testutils.ErrTesting
}
return gocent.ChannelsResult{
Channels: map[string]gocent.ChannelInfo{
"ns:ch1@2000": {},
"ch2": {},
"ns:ch3@5000": {},
},
}, nil
},
}

in, err := ChannelIntervals(context.Background(), mockedCentrifugoChannels, "ns")

if msg := testutils.AssertError(t, err, tc.expectError); msg != "" {
t.Error(msg)
}
if got, want := in, tc.expectIntervals; !reflect.DeepEqual(got, want) {
t.Errorf("returned intervals, want %#q, got %#q", want, got)
}
})
}
}
126 changes: 122 additions & 4 deletions cmd/opcua-centrifugo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"io"
"net/http"
"os"
"strings"
"syscall"
"text/tabwriter"
"time"

"github.com/cailloumajor/opcua-centrifugo/internal/opcua"
"github.com/cailloumajor/opcua-centrifugo/internal/proxy"
"github.com/centrifugal/gocent/v3"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/joho/godotenv"
"github.com/oklog/run"
"github.com/peterbourgon/ff"
)
Expand Down Expand Up @@ -43,23 +49,45 @@ func usageFor(fs *flag.FlagSet, out io.Writer) func() {
}
}

func main() {
var opcuaConfig opcua.Config
func checkErr(l log.Logger, err error, code int) {
if err != nil {
level.Info(l).Log("err", err)
os.Exit(code)
}
}

func main() {
var (
opcuaConfig opcua.Config
opcuaClientTimeout time.Duration
opcuaTidyInterval time.Duration
proxyListen string
centrifugoNamespace string
centrifugoClientConfig gocent.Config
)
fs := flag.NewFlagSet("opcua-centrifugo", flag.ExitOnError)
fs.StringVar(&opcuaConfig.ServerURL, "opcua-server-url", "opc.tcp://127.0.0.1:4840", "OPC-UA server endpoint URL")
fs.StringVar(&opcuaConfig.User, "opcua-user", "", "user name for OPC-UA authentication (optional)")
fs.StringVar(&opcuaConfig.Password, "opcua-password", "", "password for OPC-UA authentication (optional)")
fs.StringVar(&opcuaConfig.CertFile, "opcua-cert-file", "", "certificate file path for OPC-UA secure channel (optional)")
fs.StringVar(&opcuaConfig.KeyFile, "opcua-key-file", "", "private key file path for OPC-UA secure channel (optional)")
fs.DurationVar(&opcuaClientTimeout, "opcua-client-timeout", 10*time.Second, "timeout for connecting the OPC-UA client")
fs.DurationVar(&opcuaTidyInterval, "opcua-tidy-interval", 30*time.Second, "interval at which to tidy-up OPC-UA subscriptions")
fs.StringVar(&proxyListen, "proxy-listen", ":8080", "Centrifugo proxy listen address")
fs.StringVar(&centrifugoClientConfig.Addr, "centrifugo-api-address", "", "Centrifugo API endpoint")
fs.StringVar(&centrifugoNamespace, "centrifugo-namespace", "", "Centrifugo channel namespace for this instance")
fs.StringVar(&centrifugoClientConfig.Key, "centrifugo-api-key", "", "Centrifugo API key")
debug := fs.Bool("debug", false, "log debug information")
fs.Usage = usageFor(fs, os.Stderr)

logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))

if err := godotenv.Load(); err != nil && !errors.Is(err, os.ErrNotExist) {
checkErr(log.With(logger, "during", "env file loading"), err, 1)
}

if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix()); err != nil {
level.Info(logger).Log("during", "flags parsing", "err", err)
os.Exit(2)
checkErr(log.With(logger, "during", "flags parsing"), err, 2)
}

loglevel := level.AllowInfo()
Expand All @@ -68,7 +96,97 @@ func main() {
}
logger = level.NewFilter(logger, loglevel)

var opcClient *opcua.Client
var opcMonitor *opcua.Monitor
func() {
l := log.With(logger, "during", "OPC-UA client creation")
ctx, cancel := context.WithTimeout(context.Background(), opcuaClientTimeout)
defer cancel()
sec, err := opcua.NewSecurity(&opcuaConfig, opcua.DefaultSecurityOptsProvider{})
checkErr(l, err, 1)
opcClient, err = opcua.NewClient(ctx, &opcuaConfig, opcua.DefaultClientExtDeps{}, sec)
checkErr(l, err, 1)
opcMonitor = opcua.NewMonitor(opcClient)
}()

proxy := proxy.NewProxy(opcMonitor, proxy.DefaultCentrifugoChannelParser{}, centrifugoNamespace)

centrifugoClient := gocent.New(centrifugoClientConfig)

var g run.Group

{
proxyLogger := log.With(logger, "component", "proxy")
srv := http.Server{
Addr: proxyListen,
Handler: proxy,
}
g.Add(func() error {
level.Info(proxyLogger).Log("listen", proxyListen)
return srv.ListenAndServe()
}, func(err error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
level.Debug(proxyLogger).Log("msg", "shutting down")
if err := srv.Shutdown(ctx); err != nil {
level.Info(proxyLogger).Log("during", "Shutdown", "err", err)
}
})
}

{
monitorLogger := log.With(logger, "component", "OPC-UA monitor")
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
c, d, err := opcMonitor.GetDataChange(ctx)
if err != nil {
level.Info(monitorLogger).Log("during", "GetDataChange", "err", err)
}
if _, err := centrifugoClient.Publish(ctx, c, d); err != nil {
level.Info(monitorLogger).Log("during", "Publish", "err", err)
}
}
}
}, func(err error) {
cancel()
stopContext, stopCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer stopCancel()
level.Debug(monitorLogger).Log("msg", "stopping")
opcMonitor.Stop(stopContext)
})
}

{
tidyLogger := log.With(logger, "component", "tidy routine")
ctx, cancel := context.WithCancel(context.Background())
ticker := time.NewTicker(opcuaTidyInterval)
g.Add(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
ints, err := ChannelIntervals(ctx, centrifugoClient, centrifugoNamespace)
if err != nil {
level.Info(tidyLogger).Log("during", "ChannelIntervals", "err", err)
}
errs := opcMonitor.Purge(ctx, ints)
for _, err := range errs {
level.Info(tidyLogger).Log("during", "monitor purge", "err", err)
}
}
}
}, func(err error) {
ticker.Stop()
cancel()
})
}

g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM))

level.Info(logger).Log("exit", g.Run())
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ module github.com/cailloumajor/opcua-centrifugo
go 1.17

require (
github.com/centrifugal/gocent/v3 v3.2.0
github.com/go-kit/log v0.2.0
github.com/gopcua/opcua v0.3.1
github.com/joho/godotenv v1.4.0
github.com/oklog/run v1.1.0
github.com/peterbourgon/ff v1.7.1
)
Expand Down
Loading

0 comments on commit 2500e39

Please sign in to comment.