Skip to content

Commit

Permalink
check running nats server for errors
Browse files Browse the repository at this point in the history
  • Loading branch information
wkloucek committed Feb 17, 2022
1 parent 602458a commit 065a273
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 58 deletions.
118 changes: 60 additions & 58 deletions nats/pkg/command/server.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package command

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/cs3org/reva/pkg/events/server"
"github.com/oklog/run"
"github.com/owncloud/ocis/nats/pkg/config"
"github.com/owncloud/ocis/nats/pkg/config/parser"
"github.com/owncloud/ocis/nats/pkg/logging"
"github.com/owncloud/ocis/ocis-pkg/log"
"github.com/owncloud/ocis/nats/pkg/server/nats"
"github.com/urfave/cli/v2"

// TODO: .Logger Option on events/server would make this import redundant
Expand All @@ -28,65 +27,68 @@ func Server(cfg *config.Config) *cli.Command {
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
err := server.RunNatsServer(server.Host(cfg.Nats.Host), server.Port(cfg.Nats.Port), server.StanOpts(func(o *stanServer.Options) {
o.CustomLogger = &logWrapper{logger}
}))
if err != nil {
return err
}
for {
select {
case <-ch:
// TODO: Should we shut down the NatsServer in a proper way here?
// That would require a reference to the StanServer instance for being able to call
// StanServer.Shutdown() github.com/cs3org/reva/pkg/events/server doesn't provide that
// currently
return nil

gr := run.Group{}
ctx, cancel := func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
}
},
}
}
return context.WithCancel(cfg.Context)
}()

// we need to wrap our logger so we can pass it to the nats server
type logWrapper struct {
logger log.Logger
}
defer cancel()

// Noticef logs a notice statement
func (l *logWrapper) Noticef(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Info().Msg(msg)
}
var natsServer *stanServer.StanServer

// Warnf logs a warning statement
func (l *logWrapper) Warnf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Warn().Msg(msg)
}
gr.Add(func() error {
var err error

// Fatalf logs a fatal statement
func (l *logWrapper) Fatalf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Fatal().Msg(msg)
}
natsServer, err = nats.RunNatsServer(
nats.Host(cfg.Nats.Host),
nats.Port(cfg.Nats.Port),
nats.StanOpts(
func(o *stanServer.Options) {
o.CustomLogger = logging.NewLogWrapper(logger)
},
),
)

// Errorf logs an error statement
func (l *logWrapper) Errorf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Error().Msg(msg)
}
if err != nil {
return err
}

// Debugf logs a debug statement
func (l *logWrapper) Debugf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Debug().Msg(msg)
}
errChan := make(chan error)

go func() {
for {
// check if NATs server has an encountered an error
if err := natsServer.LastError(); err != nil {
errChan <- err
return
}
if ctx.Err() != nil {
return // context closed
}
time.Sleep(1 * time.Second)
}
}()

// Tracef logs a trace statement
func (l *logWrapper) Tracef(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Trace().Msg(msg)
select {
case <-ctx.Done():
return nil
case err = <-errChan:
return err
}

}, func(_ error) {
logger.Info().
Msg("Shutting down server")

natsServer.Shutdown()
cancel()
})

return gr.Run()
},
}
}
52 changes: 52 additions & 0 deletions nats/pkg/logging/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package logging

import (
"fmt"

"github.com/owncloud/ocis/ocis-pkg/log"
)

func NewLogWrapper(logger log.Logger) *LogWrapper {
return &LogWrapper{logger}
}

// we need to wrap our logger so we can pass it to the nats server
type LogWrapper struct {
logger log.Logger
}

// Noticef logs a notice statement
func (l *LogWrapper) Noticef(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Info().Msg(msg)
}

// Warnf logs a warning statement
func (l *LogWrapper) Warnf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Warn().Msg(msg)
}

// Fatalf logs a fatal statement
func (l *LogWrapper) Fatalf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Fatal().Msg(msg)
}

// Errorf logs an error statement
func (l *LogWrapper) Errorf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Error().Msg(msg)
}

// Debugf logs a debug statement
func (l *LogWrapper) Debugf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Debug().Msg(msg)
}

// Tracef logs a trace statement
func (l *LogWrapper) Tracef(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
l.logger.Trace().Msg(msg)
}
17 changes: 17 additions & 0 deletions nats/pkg/server/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package nats

import (
stanServer "github.com/nats-io/nats-streaming-server/server"
)

// RunNatsServer runs the nats streaming server
func RunNatsServer(opts ...Option) (*stanServer.StanServer, error) {
natsOpts := stanServer.DefaultNatsServerOptions
stanOpts := stanServer.GetDefaultOptions()

for _, o := range opts {
o(&natsOpts, stanOpts)
}
s, err := stanServer.RunServerWithOpts(stanOpts, &natsOpts)
return s, err
}
37 changes: 37 additions & 0 deletions nats/pkg/server/nats/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package nats

import (
natsServer "github.com/nats-io/nats-server/v2/server"
stanServer "github.com/nats-io/nats-streaming-server/server"
)

// Option configures the nats server
type Option func(*natsServer.Options, *stanServer.Options)

// Host sets the host URL for the nats server
func Host(url string) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Host = url
}
}

// Port sets the host URL for the nats server
func Port(port int) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Port = port
}
}

// NatsOpts allows setting Options from nats package directly
func NatsOpts(opt func(*natsServer.Options)) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
opt(no)
}
}

// StanOpts allows setting Options from stan package directly
func StanOpts(opt func(*stanServer.Options)) Option {
return func(_ *natsServer.Options, so *stanServer.Options) {
opt(so)
}
}

0 comments on commit 065a273

Please sign in to comment.