Skip to content

Commit

Permalink
cloudapi: Refresh log tail connection
Browse files Browse the repository at this point in the history
It makes the log tail connection more fault-tolerant:
  * it tries to acquire a new connection in case of error.
  * the dial operation retries few times with exponential backoff before
    return a final error.
  • Loading branch information
codebien committed Jul 26, 2021
1 parent a33b054 commit 64fd757
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 20 deletions.
8 changes: 6 additions & 2 deletions cloudapi/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ func TestDetailsError(t *testing.T) {
assert.EqualError(t, err, "(403) Validation failed\n name: Shorter than minimum length 2.")
}

func TestRetry(t *testing.T) {
func TestClientRetry(t *testing.T) {
t.Parallel()

called := 0
idempotencyKey := ""
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -146,7 +148,9 @@ func TestRetry(t *testing.T) {
assert.NotNil(t, err)
}

func TestRetrySuccessOnSecond(t *testing.T) {
func TestClientRetrySuccessOnSecond(t *testing.T) {
t.Parallel()

called := 1
idempotencyKey := ""
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
95 changes: 77 additions & 18 deletions cloudapi/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ package cloudapi
import (
"context"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
"strconv"
"sync"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -92,43 +95,48 @@ func labelsToLogrusFields(labels map[string]string) logrus.Fields {
return fields
}

func (c *Config) getRequest(referenceID string, start time.Duration) (*url.URL, error) {
func (c *Config) logtailConn(ctx context.Context, referenceID string, since time.Time) (*websocket.Conn, error) {
u, err := url.Parse(c.LogsTailURL.String)
if err != nil {
return nil, fmt.Errorf("couldn't parse cloud logs host %w", err)
}

u.RawQuery = fmt.Sprintf(`query={test_run_id="%s"}&start=%d`,
referenceID,
time.Now().Add(-start).UnixNano(),
)
u.RawQuery = fmt.Sprintf(`query={test_run_id="%s"}&start=%d`, referenceID, since.UnixNano())

return u, nil
headers := make(http.Header)
headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String)

var conn *websocket.Conn
err = retry(sleeperFunc(time.Sleep), 3, 5*time.Second, 2*time.Minute, func() (err error) {
// We don't need to close the http body or use it for anything until we want to actually log
// what the server returned as body when it errors out
conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose
return err
})
if err != nil {
return nil, err
}
return conn, nil
}

// StreamLogsToLogger streams the logs for the configured test to the provided logger until ctx is
// Done or an error occurs.
func (c *Config) StreamLogsToLogger(
ctx context.Context, logger logrus.FieldLogger, referenceID string, start time.Duration,
ctx context.Context, logger logrus.FieldLogger, referenceID string, tailFrom time.Duration,
) error {
u, err := c.getRequest(referenceID, start)
if err != nil {
return err
}

headers := make(http.Header)
headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String)
var mconn sync.Mutex

// We don't need to close the http body or use it for anything until we want to actually log
// what the server returned as body when it errors out
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose
conn, err := c.logtailConn(ctx, referenceID, time.Now().Add(-tailFrom))
if err != nil {
return err
}

go func() {
<-ctx.Done()

mconn.Lock()
defer mconn.Unlock()

_ = conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing"),
Expand All @@ -138,7 +146,6 @@ func (c *Config) StreamLogsToLogger(
}()

msgBuffer := make(chan []byte, 10)

defer close(msgBuffer)

go func() {
Expand Down Expand Up @@ -166,6 +173,18 @@ func (c *Config) StreamLogsToLogger(
if err != nil {
logger.WithError(err).Warn("error reading a message from the cloud")

// try to restore the stream establishing a new connection
logger.Warn("trying to establish a fresh connection with the tail logs, this might result in either some repeated or missed messages") //nolint:lll

newconn, errd := c.logtailConn(ctx, referenceID, time.Now())
if errd == nil {
mconn.Lock()
conn = newconn
mconn.Unlock()
continue
}

// return the main error
return err
}

Expand All @@ -176,3 +195,43 @@ func (c *Config) StreamLogsToLogger(
}
}
}

// sleeper represents an abstraction for waiting an amount of time.
type sleeper interface {
Sleep(d time.Duration)
}

// sleeperFunc uses the underhood function for implementing the wait operation.
type sleeperFunc func(time.Duration)

func (sfn sleeperFunc) Sleep(d time.Duration) {
sfn(d)
}

// retry retries to execute a provided function until it isn't successful
// or the maximum number of attempts is hit. It waits the specified interval
// between the latest iteration and the next retry.
// Interval is used as the base to compute an exponential backoff,
// if the computed interval overtakes the max interval then max will be used.
func retry(s sleeper, attempts uint, interval, max time.Duration, do func() error) (err error) {
baseInterval := math.Abs(interval.Truncate(time.Second).Seconds())
r := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec

for i := 0; i < int(attempts); i++ {
if i > 0 {
// wait = (interval ^ i) + random milliseconds
wait := time.Duration(math.Pow(baseInterval, float64(i))) * time.Second
wait += time.Duration(r.Int63n(1000)) * time.Millisecond

if wait > max {
wait = max
}
s.Sleep(wait)
}
err = do()
if err == nil {
return nil
}
}
return
}
Loading

0 comments on commit 64fd757

Please sign in to comment.