Skip to content

Commit

Permalink
cloudapi: Refresh log tail connection
Browse files Browse the repository at this point in the history
It makes the log tail connection more fault-tolerant:
  * it tries to acquire a new connection in case of error.
  * the dial operation retries few times with exponential backoff before
    return a final error.
  • Loading branch information
codebien committed Aug 25, 2021
1 parent 34a7743 commit c932a28
Show file tree
Hide file tree
Showing 3 changed files with 424 additions and 29 deletions.
8 changes: 6 additions & 2 deletions cloudapi/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ func TestDetailsError(t *testing.T) {
assert.EqualError(t, err, "(403) Validation failed\n name: Shorter than minimum length 2.")
}

func TestRetry(t *testing.T) {
func TestClientRetry(t *testing.T) {
t.Parallel()

called := 0
idempotencyKey := ""
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -146,7 +148,9 @@ func TestRetry(t *testing.T) {
assert.NotNil(t, err)
}

func TestRetrySuccessOnSecond(t *testing.T) {
func TestClientRetrySuccessOnSecond(t *testing.T) {
t.Parallel()

called := 1
idempotencyKey := ""
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
142 changes: 115 additions & 27 deletions cloudapi/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ package cloudapi
import (
"context"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -53,8 +57,11 @@ type msgDroppedEntries struct {
Timestamp string `json:"timestamp"`
}

func (m *msg) Log(logger logrus.FieldLogger) {
// Log writes the Streams and Dropped Entries to the passed logger.
// It returns the most recent timestamp seen overall messages.
func (m *msg) Log(logger logrus.FieldLogger) int64 {
var level string
var ts int64

for _, stream := range m.Streams {
fields := labelsToLogrusFields(stream.Stream)
Expand All @@ -64,22 +71,33 @@ func (m *msg) Log(logger logrus.FieldLogger) {
}

for _, value := range stream.Values {
nsec, _ := strconv.Atoi(value[0])
e := logger.WithFields(fields).WithTime(time.Unix(0, int64(nsec)))
nsec, _ := strconv.ParseInt(value[0], 10, 64)
e := logger.WithFields(fields).WithTime(time.Unix(0, nsec))
lvl, err := logrus.ParseLevel(level)
if err != nil {
e.Info(value[1])
e.Warn("last message had unknown level " + level)
} else {
e.Log(lvl, value[1])
}

// find the latest seen message
if nsec > ts {
ts = nsec
}
}
}

for _, dropped := range m.DroppedEntries {
nsec, _ := strconv.Atoi(dropped.Timestamp)
logger.WithFields(labelsToLogrusFields(dropped.Labels)).WithTime(time.Unix(0, int64(nsec))).Warn("dropped")
nsec, _ := strconv.ParseInt(dropped.Timestamp, 10, 64)
logger.WithFields(labelsToLogrusFields(dropped.Labels)).WithTime(time.Unix(0, nsec)).Warn("dropped")

if nsec > ts {
ts = nsec
}
}

return ts
}

func labelsToLogrusFields(labels map[string]string) logrus.Fields {
Expand All @@ -92,43 +110,48 @@ func labelsToLogrusFields(labels map[string]string) logrus.Fields {
return fields
}

func (c *Config) getRequest(referenceID string, start time.Duration) (*url.URL, error) {
func (c *Config) logtailConn(ctx context.Context, referenceID string, since time.Time) (*websocket.Conn, error) {
u, err := url.Parse(c.LogsTailURL.String)
if err != nil {
return nil, fmt.Errorf("couldn't parse cloud logs host %w", err)
}

u.RawQuery = fmt.Sprintf(`query={test_run_id="%s"}&start=%d`,
referenceID,
time.Now().Add(-start).UnixNano(),
)
u.RawQuery = fmt.Sprintf(`query={test_run_id="%s"}&start=%d`, referenceID, since.UnixNano())

return u, nil
headers := make(http.Header)
headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String)

var conn *websocket.Conn
err = retry(sleeperFunc(time.Sleep), 3, 5*time.Second, 2*time.Minute, func() (err error) {
// We don't need to close the http body or use it for anything until we want to actually log
// what the server returned as body when it errors out
conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose
return err
})
if err != nil {
return nil, err
}
return conn, nil
}

// StreamLogsToLogger streams the logs for the configured test to the provided logger until ctx is
// Done or an error occurs.
func (c *Config) StreamLogsToLogger(
ctx context.Context, logger logrus.FieldLogger, referenceID string, start time.Duration,
ctx context.Context, logger logrus.FieldLogger, referenceID string, tailFrom time.Duration,
) error {
u, err := c.getRequest(referenceID, start)
if err != nil {
return err
}
var mconn sync.Mutex

headers := make(http.Header)
headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String)

// We don't need to close the http body or use it for anything until we want to actually log
// what the server returned as body when it errors out
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose
conn, err := c.logtailConn(ctx, referenceID, time.Now().Add(-tailFrom))
if err != nil {
return err
}

go func() {
<-ctx.Done()

mconn.Lock()
defer mconn.Unlock()

_ = conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing"),
Expand All @@ -138,9 +161,9 @@ func (c *Config) StreamLogsToLogger(
}()

msgBuffer := make(chan []byte, 10)

defer close(msgBuffer)

var mostRecent int64
go func() {
for message := range msgBuffer {
var m msg
Expand All @@ -150,8 +173,8 @@ func (c *Config) StreamLogsToLogger(

continue
}

m.Log(logger)
ts := m.Log(logger)
atomic.StoreInt64(&mostRecent, ts)
}
}()

Expand All @@ -164,9 +187,34 @@ func (c *Config) StreamLogsToLogger(
}

if err != nil {
logger.WithError(err).Warn("error reading a message from the cloud")
logger.WithError(err).Warn("error reading a log message from the cloud, trying to establish a fresh connection with the logs service...") //nolint:lll

return err
var since time.Time
if ts := atomic.LoadInt64(&mostRecent); ts > 0 {
// add 1ns for avoid possible repetition
since = time.Unix(0, ts).Add(time.Nanosecond)
} else {
since = time.Now()
}

// TODO: avoid the "logical" race condition
// The case explained:
// * The msgBuffer consumer is slow
// * ReadMessage is fast and adds at least one more message in the buffer
// * An error is got in the meantime and the re-dialing procedure is tried
// * Then the latest timestamp used will not be the real latest received
// * because it is still waiting to be processed.
// In the case the connection will be restored then the first message will be a duplicate.
newconn, errd := c.logtailConn(ctx, referenceID, since)
if errd != nil {
// return the main error
return err
}

mconn.Lock()
conn = newconn
mconn.Unlock()
continue
}

select {
Expand All @@ -176,3 +224,43 @@ func (c *Config) StreamLogsToLogger(
}
}
}

// sleeper represents an abstraction for waiting an amount of time.
type sleeper interface {
Sleep(d time.Duration)
}

// sleeperFunc uses the underhood function for implementing the wait operation.
type sleeperFunc func(time.Duration)

func (sfn sleeperFunc) Sleep(d time.Duration) {
sfn(d)
}

// retry retries to execute a provided function until it isn't successful
// or the maximum number of attempts is hit. It waits the specified interval
// between the latest iteration and the next retry.
// Interval is used as the base to compute an exponential backoff,
// if the computed interval overtakes the max interval then max will be used.
func retry(s sleeper, attempts uint, interval, max time.Duration, do func() error) (err error) {
baseInterval := math.Abs(interval.Truncate(time.Second).Seconds())
r := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec

for i := 0; i < int(attempts); i++ {
if i > 0 {
// wait = (interval ^ i) + random milliseconds
wait := time.Duration(math.Pow(baseInterval, float64(i))) * time.Second
wait += time.Duration(r.Int63n(1000)) * time.Millisecond

if wait > max {
wait = max
}
s.Sleep(wait)
}
err = do()
if err == nil {
return nil
}
}
return
}
Loading

0 comments on commit c932a28

Please sign in to comment.