Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: client app retry logic #144

Merged
merged 2 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 2 additions & 22 deletions client/cmd/service_controller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -11,31 +10,12 @@ import (

func (p *program) Start(s service.Service) error {

var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}

// Start should not block. Do the actual work async.
log.Info("starting service") //nolint
go func() {
operation := func() error {
err := runClient()
if err != nil {
log.Warnf("retrying Wiretrustee client app due to error: %v", err)
return err
}
return nil
}

err := backoff.Retry(operation, backOff)
err := runClient()
if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
log.Errorf("stopped Wiretrustee client app due to error: %v", err)
return
}
}()
Expand Down
157 changes: 90 additions & 67 deletions client/cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -12,6 +13,7 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"time"
)

var (
Expand Down Expand Up @@ -117,86 +119,107 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK
}

func runClient() error {
config, err := internal.ReadConfig(managementURL, configPath)
if err != nil {
log.Errorf("failed reading config %s %v", configPath, err)
return err
}
var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: time.Hour,
MaxElapsedTime: 24 * 3 * time.Hour,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}

operation := func() error {

config, err := internal.ReadConfig(managementURL, configPath)
if err != nil {
log.Errorf("failed reading config %s %v", configPath, err)
return err
}

//validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mgmTlsEnabled := false
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}
mgmTlsEnabled := false
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}

// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
if err != nil {
log.Warn(err)
return err
}
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
if err != nil {
log.Warn(err)
return err
}

// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
if err != nil {
log.Error(err)
return err
}
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
if err != nil {
log.Error(err)
return err
}

peerConfig := loginResp.GetPeerConfig()
peerConfig := loginResp.GetPeerConfig()

engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
if err != nil {
log.Error(err)
return err
}
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
if err != nil {
log.Error(err)
return err
}

// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
err = engine.Start()
if err != nil {
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
return err
}
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
err = engine.Start()
if err != nil {
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
return err
}

log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)

select {
case <-stopCh:
case <-ctx.Done():
}
select {
case <-stopCh:
case <-ctx.Done():
}

err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
return err
}
err = signalClient.Close()
if err != nil {
log.Errorf("failed closing Signal Service client %v", err)
return err
}
backOff.Reset()

err = engine.Stop()
if err != nil {
log.Errorf("failed stopping engine %v", err)
return err
}
err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
return err
}
err = signalClient.Close()
if err != nil {
log.Errorf("failed closing Signal Service client %v", err)
return err
}

go func() {
cleanupCh <- struct{}{}
}()
err = engine.Stop()
if err != nil {
log.Errorf("failed stopping engine %v", err)
return err
}

log.Info("stopped Wiretrustee client")
go func() {
cleanupCh <- struct{}{}
}()

return ctx.Err()
log.Info("stopped Wiretrustee client")

return ctx.Err()
}

err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
return err
}
return nil
}
10 changes: 4 additions & 6 deletions management/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
MaxInterval: 15 * time.Minute,
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
Expand Down Expand Up @@ -103,12 +103,10 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
// blocking until error
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
if err != nil {
/*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied {
//todo handle differently??
}*/
backOff.Reset()
return err
}
backOff.Reset()

return nil
}

Expand Down
23 changes: 14 additions & 9 deletions signal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
MaxInterval: 15 * time.Minute,
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
Expand All @@ -101,14 +101,19 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {

operation := func() error {

err := c.connect(c.key.PublicKey().String(), msgHandler)
stream, err := c.connect(c.key.PublicKey().String())
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
c.connWg.Add(1)
return err
}

backOff.Reset()
err = c.receive(stream, msgHandler)
mlsmaycon marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
backOff.Reset()
return err
}

return nil
}

Expand All @@ -120,7 +125,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
}()
}

func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error {
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
c.stream = nil

// add key fingerprint to the request header to be identified on the server side
Expand All @@ -131,23 +136,23 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error)

c.stream = stream
if err != nil {
return err
return nil, err
}
// blocks
header, err := c.stream.Header()
if err != nil {
return err
return nil, err
}
registered := header.Get(proto.HeaderRegistered)
if len(registered) == 0 {
return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
}
//connection established we are good to use the stream
c.connWg.Done()

log.Infof("connected to the Signal Exchange Stream")

return c.receive(stream, msgHandler)
return stream, nil
}

// WaitConnected waits until the client is connected to the message stream
Expand Down