diff --git a/client/cmd/service_controller.go b/client/cmd/service_controller.go index ca20b484212..82873fcabb5 100644 --- a/client/cmd/service_controller.go +++ b/client/cmd/service_controller.go @@ -1,7 +1,6 @@ package cmd import ( - "github.com/cenkalti/backoff/v4" "github.com/kardianos/service" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -11,31 +10,12 @@ import ( func (p *program) Start(s service.Service) error { - var backOff = &backoff.ExponentialBackOff{ - InitialInterval: time.Second, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: 30 * time.Second, - MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying - Stop: backoff.Stop, - Clock: backoff.SystemClock, - } - // Start should not block. Do the actual work async. log.Info("starting service") //nolint go func() { - operation := func() error { - err := runClient() - if err != nil { - log.Warnf("retrying Wiretrustee client app due to error: %v", err) - return err - } - return nil - } - - err := backoff.Retry(operation, backOff) + err := runClient() if err != nil { - log.Errorf("exiting client retry loop due to unrecoverable error: %s", err) + log.Errorf("stopped Wiretrustee client app due to error: %v", err) return } }() diff --git a/client/cmd/up.go b/client/cmd/up.go index 181d585e5f6..48141d4901a 100644 --- a/client/cmd/up.go +++ b/client/cmd/up.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "github.com/cenkalti/backoff/v4" "github.com/kardianos/service" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -12,6 +13,7 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "time" ) var ( @@ -117,86 +119,107 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK } func runClient() error { - config, err := internal.ReadConfig(managementURL, configPath) - if err != nil { - log.Errorf("failed reading config %s %v", configPath, err) - return err - } + var backOff = &backoff.ExponentialBackOff{ + InitialInterval: time.Second, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: time.Hour, + MaxElapsedTime: 24 * 3 * time.Hour, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + + operation := func() error { + + config, err := internal.ReadConfig(managementURL, configPath) + if err != nil { + log.Errorf("failed reading config %s %v", configPath, err) + return err + } - //validate our peer's Wireguard PRIVATE key - myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey) - if err != nil { - log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error()) - return err - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + //validate our peer's Wireguard PRIVATE key + myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey) + if err != nil { + log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error()) + return err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - mgmTlsEnabled := false - if config.ManagementURL.Scheme == "https" { - mgmTlsEnabled = true - } + mgmTlsEnabled := false + if config.ManagementURL.Scheme == "https" { + mgmTlsEnabled = true + } - // connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config - mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled) - if err != nil { - log.Warn(err) - return err - } + // connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config + mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled) + if err != nil { + log.Warn(err) + return err + } - // with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal - signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey) - if err != nil { - log.Error(err) - return err - } + // with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal + signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey) + if err != nil { + log.Error(err) + return err + } - peerConfig := loginResp.GetPeerConfig() + peerConfig := loginResp.GetPeerConfig() - engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig) - if err != nil { - log.Error(err) - return err - } + engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig) + if err != nil { + log.Error(err) + return err + } - // create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers. - engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx) - err = engine.Start() - if err != nil { - log.Errorf("error while starting Wiretrustee Connection Engine: %s", err) - return err - } + // create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers. + engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx) + err = engine.Start() + if err != nil { + log.Errorf("error while starting Wiretrustee Connection Engine: %s", err) + return err + } - log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address) + log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address) - select { - case <-stopCh: - case <-ctx.Done(): - } + select { + case <-stopCh: + case <-ctx.Done(): + } - err = mgmClient.Close() - if err != nil { - log.Errorf("failed closing Management Service client %v", err) - return err - } - err = signalClient.Close() - if err != nil { - log.Errorf("failed closing Signal Service client %v", err) - return err - } + backOff.Reset() - err = engine.Stop() - if err != nil { - log.Errorf("failed stopping engine %v", err) - return err - } + err = mgmClient.Close() + if err != nil { + log.Errorf("failed closing Management Service client %v", err) + return err + } + err = signalClient.Close() + if err != nil { + log.Errorf("failed closing Signal Service client %v", err) + return err + } - go func() { - cleanupCh <- struct{}{} - }() + err = engine.Stop() + if err != nil { + log.Errorf("failed stopping engine %v", err) + return err + } - log.Info("stopped Wiretrustee client") + go func() { + cleanupCh <- struct{}{} + }() - return ctx.Err() + log.Info("stopped Wiretrustee client") + return ctx.Err() + } + + err := backoff.Retry(operation, backOff) + if err != nil { + log.Errorf("exiting client retry loop due to unrecoverable error: %s", err) + return err + } + return nil } diff --git a/management/client/client.go b/management/client/client.go index 019c8668b24..51a6b7b8703 100644 --- a/management/client/client.go +++ b/management/client/client.go @@ -70,8 +70,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff { InitialInterval: 800 * time.Millisecond, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, - MaxInterval: 30 * time.Second, - MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying + MaxInterval: 15 * time.Minute, + MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client Stop: backoff.Stop, Clock: backoff.SystemClock, }, ctx) @@ -103,12 +103,10 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { // blocking until error err = c.receiveEvents(stream, *serverPubKey, msgHandler) if err != nil { - /*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied { - //todo handle differently?? - }*/ + backOff.Reset() return err } - backOff.Reset() + return nil } diff --git a/signal/client/client.go b/signal/client/client.go index 5b19c3c1fe8..5702a8c1c18 100644 --- a/signal/client/client.go +++ b/signal/client/client.go @@ -81,8 +81,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff { InitialInterval: 800 * time.Millisecond, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, - MaxInterval: 30 * time.Second, - MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying + MaxInterval: 15 * time.Minute, + MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client Stop: backoff.Stop, Clock: backoff.SystemClock, }, ctx) @@ -101,14 +101,19 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { operation := func() error { - err := c.connect(c.key.PublicKey().String(), msgHandler) + stream, err := c.connect(c.key.PublicKey().String()) if err != nil { log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) c.connWg.Add(1) return err } - backOff.Reset() + err = c.receive(stream, msgHandler) + if err != nil { + backOff.Reset() + return err + } + return nil } @@ -120,7 +125,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { }() } -func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error { +func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) { c.stream = nil // add key fingerprint to the request header to be identified on the server side @@ -131,23 +136,23 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) c.stream = stream if err != nil { - return err + return nil, err } // blocks header, err := c.stream.Header() if err != nil { - return err + return nil, err } registered := header.Get(proto.HeaderRegistered) if len(registered) == 0 { - return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams") + return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams") } //connection established we are good to use the stream c.connWg.Done() log.Infof("connected to the Signal Exchange Stream") - return c.receive(stream, msgHandler) + return stream, nil } // WaitConnected waits until the client is connected to the message stream