Skip to content

Commit

Permalink
fix: client app retry logic (#144)
Browse files Browse the repository at this point in the history
* fix: retry logic
  • Loading branch information
braginini authored Nov 1, 2021
1 parent 2c729fe commit d040cfe
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 104 deletions.
24 changes: 2 additions & 22 deletions client/cmd/service_controller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -11,31 +10,12 @@ import (

func (p *program) Start(s service.Service) error {

var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}

// Start should not block. Do the actual work async.
log.Info("starting service") //nolint
go func() {
operation := func() error {
err := runClient()
if err != nil {
log.Warnf("retrying Wiretrustee client app due to error: %v", err)
return err
}
return nil
}

err := backoff.Retry(operation, backOff)
err := runClient()
if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
log.Errorf("stopped Wiretrustee client app due to error: %v", err)
return
}
}()
Expand Down
157 changes: 90 additions & 67 deletions client/cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -12,6 +13,7 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"time"
)

var (
Expand Down Expand Up @@ -117,86 +119,107 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK
}

func runClient() error {
config, err := internal.ReadConfig(managementURL, configPath)
if err != nil {
log.Errorf("failed reading config %s %v", configPath, err)
return err
}
var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: time.Hour,
MaxElapsedTime: 24 * 3 * time.Hour,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}

operation := func() error {

config, err := internal.ReadConfig(managementURL, configPath)
if err != nil {
log.Errorf("failed reading config %s %v", configPath, err)
return err
}

//validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mgmTlsEnabled := false
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}
mgmTlsEnabled := false
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}

// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
if err != nil {
log.Warn(err)
return err
}
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
if err != nil {
log.Warn(err)
return err
}

// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
if err != nil {
log.Error(err)
return err
}
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
if err != nil {
log.Error(err)
return err
}

peerConfig := loginResp.GetPeerConfig()
peerConfig := loginResp.GetPeerConfig()

engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
if err != nil {
log.Error(err)
return err
}
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
if err != nil {
log.Error(err)
return err
}

// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
err = engine.Start()
if err != nil {
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
return err
}
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
err = engine.Start()
if err != nil {
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
return err
}

log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)

select {
case <-stopCh:
case <-ctx.Done():
}
select {
case <-stopCh:
case <-ctx.Done():
}

err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
return err
}
err = signalClient.Close()
if err != nil {
log.Errorf("failed closing Signal Service client %v", err)
return err
}
backOff.Reset()

err = engine.Stop()
if err != nil {
log.Errorf("failed stopping engine %v", err)
return err
}
err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
return err
}
err = signalClient.Close()
if err != nil {
log.Errorf("failed closing Signal Service client %v", err)
return err
}

go func() {
cleanupCh <- struct{}{}
}()
err = engine.Stop()
if err != nil {
log.Errorf("failed stopping engine %v", err)
return err
}

log.Info("stopped Wiretrustee client")
go func() {
cleanupCh <- struct{}{}
}()

return ctx.Err()
log.Info("stopped Wiretrustee client")

return ctx.Err()
}

err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
return err
}
return nil
}
10 changes: 4 additions & 6 deletions management/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
MaxInterval: 15 * time.Minute,
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
Expand Down Expand Up @@ -103,12 +103,10 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
// blocking until error
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
if err != nil {
/*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied {
//todo handle differently??
}*/
backOff.Reset()
return err
}
backOff.Reset()

return nil
}

Expand Down
23 changes: 14 additions & 9 deletions signal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
MaxInterval: 15 * time.Minute,
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
Expand All @@ -101,14 +101,19 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {

operation := func() error {

err := c.connect(c.key.PublicKey().String(), msgHandler)
stream, err := c.connect(c.key.PublicKey().String())
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
c.connWg.Add(1)
return err
}

backOff.Reset()
err = c.receive(stream, msgHandler)
if err != nil {
backOff.Reset()
return err
}

return nil
}

Expand All @@ -120,7 +125,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
}()
}

func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error {
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
c.stream = nil

// add key fingerprint to the request header to be identified on the server side
Expand All @@ -131,23 +136,23 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error)

c.stream = stream
if err != nil {
return err
return nil, err
}
// blocks
header, err := c.stream.Header()
if err != nil {
return err
return nil, err
}
registered := header.Get(proto.HeaderRegistered)
if len(registered) == 0 {
return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
}
//connection established we are good to use the stream
c.connWg.Done()

log.Infof("connected to the Signal Exchange Stream")

return c.receive(stream, msgHandler)
return stream, nil
}

// WaitConnected waits until the client is connected to the message stream
Expand Down

0 comments on commit d040cfe

Please sign in to comment.