Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cloudapi: Refresh log tail connection
Browse files Browse the repository at this point in the history
It makes the log tail connection more fault-tolerant:
  * it tries to acquire a new connection in case of error.
  * it allows to set a config option to refresh the connection on
    a defined interval, this will avoid to hit the timeout set
    from the log provider.
  * the dial operation retries some times, based on config
    options, before return an error.
codebien committed Jul 26, 2021
1 parent a33b054 commit 31b0be0
Showing 3 changed files with 314 additions and 20 deletions.
8 changes: 6 additions & 2 deletions cloudapi/api_test.go
Original file line number Diff line number Diff line change
@@ -122,7 +122,9 @@ func TestDetailsError(t *testing.T) {
assert.EqualError(t, err, "(403) Validation failed\n name: Shorter than minimum length 2.")
}

func TestRetry(t *testing.T) {
func TestClientRetry(t *testing.T) {
t.Parallel()

called := 0
idempotencyKey := ""
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -146,7 +148,9 @@ func TestRetry(t *testing.T) {
assert.NotNil(t, err)
}

func TestRetrySuccessOnSecond(t *testing.T) {
func TestClientRetrySuccessOnSecond(t *testing.T) {
t.Parallel()

called := 1
idempotencyKey := ""
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
95 changes: 77 additions & 18 deletions cloudapi/logs.go
Original file line number Diff line number Diff line change
@@ -23,9 +23,12 @@ package cloudapi
import (
"context"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
"strconv"
"sync"
"time"

"github.com/gorilla/websocket"
@@ -92,43 +95,48 @@ func labelsToLogrusFields(labels map[string]string) logrus.Fields {
return fields
}

func (c *Config) getRequest(referenceID string, start time.Duration) (*url.URL, error) {
func (c *Config) logtailConn(ctx context.Context, referenceID string, since time.Time) (*websocket.Conn, error) {
u, err := url.Parse(c.LogsTailURL.String)
if err != nil {
return nil, fmt.Errorf("couldn't parse cloud logs host %w", err)
}

u.RawQuery = fmt.Sprintf(`query={test_run_id="%s"}&start=%d`,
referenceID,
time.Now().Add(-start).UnixNano(),
)
u.RawQuery = fmt.Sprintf(`query={test_run_id="%s"}&start=%d`, referenceID, since.UnixNano())

return u, nil
headers := make(http.Header)
headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String)

var conn *websocket.Conn
err = retry(sleeperFunc(time.Sleep), 3, 5*time.Second, 2*time.Minute, func() (err error) {
// We don't need to close the http body or use it for anything until we want to actually log
// what the server returned as body when it errors out
conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose
return err
})
if err != nil {
return nil, err
}
return conn, nil
}

// StreamLogsToLogger streams the logs for the configured test to the provided logger until ctx is
// Done or an error occurs.
func (c *Config) StreamLogsToLogger(
ctx context.Context, logger logrus.FieldLogger, referenceID string, start time.Duration,
ctx context.Context, logger logrus.FieldLogger, referenceID string, tailFrom time.Duration,
) error {
u, err := c.getRequest(referenceID, start)
if err != nil {
return err
}

headers := make(http.Header)
headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String)
var mconn sync.Mutex

// We don't need to close the http body or use it for anything until we want to actually log
// what the server returned as body when it errors out
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose
conn, err := c.logtailConn(ctx, referenceID, time.Now().Add(-tailFrom))
if err != nil {
return err
}

go func() {
<-ctx.Done()

mconn.Lock()
defer mconn.Unlock()

_ = conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing"),
@@ -138,7 +146,6 @@ func (c *Config) StreamLogsToLogger(
}()

msgBuffer := make(chan []byte, 10)

defer close(msgBuffer)

go func() {
@@ -166,6 +173,18 @@ func (c *Config) StreamLogsToLogger(
if err != nil {
logger.WithError(err).Warn("error reading a message from the cloud")

// try to restore the stream establishing a new connection
logger.Warn("trying to establish a fresh connection with the tail logs, this might result in either some repeated or missed messages") //nolint:lll

mconn.Lock()
var errd error
conn, errd = c.logtailConn(ctx, referenceID, time.Now())
mconn.Unlock()
if errd == nil {
continue
}

// return the main error
return err
}

@@ -176,3 +195,43 @@ func (c *Config) StreamLogsToLogger(
}
}
}

// sleeper represents an abstraction for waiting an amount of time.
type sleeper interface {
Sleep(d time.Duration)
}

// sleeperFunc uses the underhood function for implementing the wait operation.
type sleeperFunc func(time.Duration)

func (sfn sleeperFunc) Sleep(d time.Duration) {
sfn(d)
}

// retry retries to execute a provided function until it isn't successful
// or the maximum number of attempts is hit. It waits the specified interval
// between the latest iteration and the next retry.
// Interval is used as the base to compute an exponential backoff,
// if the computed interval overtakes the max interval then max will be used.
func retry(s sleeper, attempts uint, interval, max time.Duration, do func() error) (err error) {
baseInterval := math.Abs(interval.Truncate(time.Second).Seconds())
r := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec

for i := 0; i < int(attempts); i++ {
if i > 0 {
// wait = (interval ^ i) + random milliseconds
wait := time.Duration(math.Pow(baseInterval, float64(i))) * time.Second
wait += time.Duration(r.Int63n(1000)) * time.Millisecond

if wait > max {
wait = max
}
s.Sleep(wait)
}
err = do()
if err == nil {
return nil
}
}
return
}
231 changes: 231 additions & 0 deletions cloudapi/logs_test.go
Original file line number Diff line number Diff line change
@@ -21,16 +21,27 @@
package cloudapi

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/mailru/easyjson"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/httpmultibin"
)

func TestMsgParsing(t *testing.T) {
@@ -123,3 +134,223 @@ func TestMSGLog(t *testing.T) {
require.Equal(t, expectTime, entry.Time)
}
}

func TestRetry(t *testing.T) {
t.Parallel()

t.Run("Success", func(t *testing.T) {
t.Parallel()

tests := []struct {
name string
attempts int
expWaits []time.Duration // pow(abs(interval), attempt index)
}{
{
name: "NoRetry",
attempts: 1,
},
{
name: "TwoAttempts",
attempts: 2,
expWaits: []time.Duration{5 * time.Second},
},
{
name: "MaximumExceeded",
attempts: 4,
expWaits: []time.Duration{5 * time.Second, 25 * time.Second, 2 * time.Minute},
},
{
name: "AttemptsLimit",
attempts: 5,
expWaits: []time.Duration{5 * time.Second, 25 * time.Second, 2 * time.Minute, 2 * time.Minute},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var sleepRequests []time.Duration
// sleepCollector tracks the request duration value for sleep requests.
sleepCollector := sleeperFunc(func(d time.Duration) {
sleepRequests = append(sleepRequests, d)
})

var iterations int
err := retry(sleepCollector, 5, 5*time.Second, 2*time.Minute, func() error {
iterations++
if iterations < tt.attempts {
return fmt.Errorf("unexpected error")
}
return nil
})
require.NoError(t, err)
require.Equal(t, tt.attempts, iterations)
require.Equal(t, len(tt.expWaits), len(sleepRequests))

// the added random milliseconds makes difficult to know the exact value
// so it asserts that expwait <= actual <= expwait + 1s
for i, expwait := range tt.expWaits {
assert.GreaterOrEqual(t, sleepRequests[i], expwait)
assert.LessOrEqual(t, sleepRequests[i], expwait+(1*time.Second))
}
})
}
})
t.Run("Fail", func(t *testing.T) {
t.Parallel()

mock := sleeperFunc(func(time.Duration) { /* noop - nowait */ })
err := retry(mock, 5, 5*time.Second, 30*time.Second, func() error {
return fmt.Errorf("unexpected error")
})

assert.Error(t, err, "unexpected error")
})
}

func TestStreamLogsToLogger(t *testing.T) {
t.Parallel()

// It registers an handler for the logtail endpoint
// It upgrades as websocket the HTTP handler and invokes the provided callback.
logtailHandleFunc := func(tb *httpmultibin.HTTPMultiBin, fn func(*websocket.Conn, *http.Request)) {
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
tb.Mux.HandleFunc("/api/v1/tail", func(w http.ResponseWriter, req *http.Request) {
conn, err := upgrader.Upgrade(w, req, nil)
require.NoError(t, err)

fn(conn, req)
_ = conn.Close()
})
}

// a basic config with the logtail endpoint set
configFromHTTPMultiBin := func(tb *httpmultibin.HTTPMultiBin) Config {
wsurl := strings.TrimPrefix(tb.ServerHTTP.URL, "http://")
return Config{
LogsTailURL: null.NewString(fmt.Sprintf("ws://%s/api/v1/tail", wsurl), false),
}
}

// get all messages from the mocked logger
logLines := func(hook *testutils.SimpleLogrusHook) (lines []string) {
for _, e := range hook.Drain() {
lines = append(lines, e.Message)
}
return
}

t.Run("Success", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

tb := httpmultibin.NewHTTPMultiBin(t)
logtailHandleFunc(tb, func(conn *websocket.Conn, _ *http.Request) {
msgfmt := `{"streams":[{"stream":{"key1":%q,"level":"warn"},"values":[["1598282752000000000",%q]]}],"dropped_entities":[]}`

b := json.RawMessage(fmt.Sprintf(msgfmt, "value1", "logline1"))
err := conn.WriteJSON(b)
require.NoError(t, err)

b = json.RawMessage([]byte(fmt.Sprintf(msgfmt, "value2", "logline2")))
err = conn.WriteJSON(b)
require.NoError(t, err)

// wait the flush on the network
time.Sleep(time.Millisecond)
cancel()
})

logger := logrus.New()
logger.Out = ioutil.Discard
hook := &testutils.SimpleLogrusHook{HookedLevels: logrus.AllLevels}
logger.AddHook(hook)

c := configFromHTTPMultiBin(tb)
err := c.StreamLogsToLogger(ctx, logger, "ref_id", 0)
require.NoError(t, err)

assert.Equal(t, []string{"logline1", "logline2"}, logLines(hook))
})

t.Run("RestoreConn", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

startFilter := func(u url.URL) (start time.Time, err error) {
rawstart, err := strconv.ParseInt(u.Query().Get("start"), 10, 64)
if err != nil {
return start, err
}

start = time.Unix(0, rawstart)
return
}

var (
firstAttempt time.Time
m sync.Mutex
)

tb := httpmultibin.NewHTTPMultiBin(t)
logtailHandleFunc(tb, func(conn *websocket.Conn, req *http.Request) {
start, err := startFilter(*req.URL)
require.NoError(t, err)

m.Lock()
attempt := firstAttempt
m.Unlock()

if attempt.IsZero() {
// if it's the first attempt then
// it generates a failure closing the connection
// in a rude way
err = conn.Close()
if err != nil {
return
}

m.Lock()
firstAttempt = start
m.Unlock()
return
}

// it asserts that the second attempt
// has a `start` after the first
require.True(t, start.After(firstAttempt))

// send a correct logline so we will able to assert
// that the connection is restored as expected
err = conn.WriteJSON(json.RawMessage(`{"streams":[{"stream":{"key1":"v1","level":"warn"},"values":[["1598282752000000000","logline-after-restored-conn"]]}],"dropped_entities":[]}`))
require.NoError(t, err)

time.Sleep(time.Millisecond)
cancel()
})

logger := logrus.New()
logger.Out = ioutil.Discard
hook := &testutils.SimpleLogrusHook{HookedLevels: logrus.AllLevels}
logger.AddHook(hook)

c := configFromHTTPMultiBin(tb)
err := c.StreamLogsToLogger(ctx, logger, "ref_id", 0)
require.NoError(t, err)

assert.NotZero(t, firstAttempt)
assert.Equal(t,
[]string{
"error reading a message from the cloud",
"trying to establish a fresh connection with the tail logs, this might result in either some repeated or missed messages",
"logline-after-restored-conn",
}, logLines(hook))
})
}

0 comments on commit 31b0be0

Please sign in to comment.