-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cloudapi/logs: Keep connection reliable #2090
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2090 +/- ##
==========================================
+ Coverage 72.09% 72.17% +0.07%
==========================================
Files 180 182 +2
Lines 14249 14370 +121
==========================================
+ Hits 10273 10371 +98
- Misses 3350 3371 +21
- Partials 626 628 +2
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked over all the code but I have two notes from my fast going through it:
- I think this would've been better handled by just handling an error on reading a message, instead of doing all of this additional changes that while great for a general usecase are both overkill and ... not enough as
- the URL we connect to has the parameter
start
which is from when loki should start returning logs. You reconnect to the same URL ... from what I can see which means that it will start sending logs from the very beginning of the test. This is both not necessary and heavy for loki and likely will not work as loki will only return only 1k+ old logs (I don't remember the exact value) and if I remember correctly it will return the oldest ones first ... which is totally not what we need.
So IMO this should've been an additional wrapping around for-loop reading messages that just on the correct error reconnects ... in the if err !=nil
block but changing start
to the last timestamp it has seen last.
cloudapi/logs.go
Outdated
return nil | ||
} | ||
|
||
// retry retries a to exeucute a provided function until it isn't succeeded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// retry retries a to exeucute a provided function until it isn't succeeded | |
// retry retries a to execute a provided function until it doesn't succeeded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor comments.
gomock
looks useful, but it pulls in a lot of dependencies, so I'd prefer it if we can avoid it and test with httpmultibin
instead.
Ready for a new review, simplified the solution and added the exponential backoff. I added them in two separate fixups so we can revert or change independently. If the current code will be accepted then I will RS and remove the useless commits about |
cloudapi/cloudapi_easyjson.go
Outdated
@@ -17,7 +17,7 @@ var ( | |||
_ easyjson.Marshaler | |||
) | |||
|
|||
func easyjsonC9ac27e5DecodeGithubComLoadimpactK6Cloudapi(in *jlexer.Lexer, out *msgStreams) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a change due to the module rename and likely will be in other places.
Can you please make a separate PR updating it for ... everything and base this on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cloudapi/logtail_streamer_mock.go
Outdated
// Code generated by MockGen. DO NOT EDIT. | ||
// Source: go.k6.io/k6/cloudapi (interfaces: LogtailStreamer) | ||
|
||
// Package cloudapi is a generated GoMock package. | ||
package cloudapi | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we drop this especially as we aren't using it ?
cloudapi/logs.go
Outdated
headers := make(http.Header) | ||
headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String) | ||
dial := func() error { | ||
u, err := c.getRequest(referenceID, start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems start
is still not changed between invocations so at 1h into a test it will still start tailing from the beginning of the test.
It should be updated to the value of the latest log it has received before calling it. Or a millisecond later or something like that.
Also it will probably be better to be and argument to dial
? or at least not be called start
, but like ... tailFrom
and start
to be it beginning value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getRequest
is using start
as a relative point from Now
, in this way, when the dial function is invoked all the previous logs should be skipped. We are passing 0
from the cmd so, currently, the final query it's matching Now
.
https://github.com/grafana/k6/blob/master/cloudapi/logs.go#L101-L104
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. Sorry, I was confused by the name of the parameter (which is also my fault given that I named it ;) ).
Given that though it means that we won't even try to get the messages between when we disconnect and when we finally reconnect.
So we either:
- Rename the parameter to be better named and less confusing and open an issue to fix the above later
- Fix this now so that
start
is the actual parameterstart
to the loki endpoint which is the time from which loki should start returning logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM. I'll take another pass once it's rebased and ready for merging.
Some notes:
- You're planning to remove
gomock
and add tests usinghttpmultibin
? 👍 - Nit and maybe unrelated to this PR, but
Config.getRequest()
parses the sameLogsTailURL
on each call:
Lines 97 to 101 in 4c5931a
func (c *Config) getRequest(referenceID string, start time.Duration) (*url.URL, error) { u, err := url.Parse(c.LogsTailURL.String) if err != nil { return nil, fmt.Errorf("couldn't parse cloud logs host %w", err) }
We could probably do this once and store theurl.URL
, though nevermind if it's difficult because of Configuration issues #883. 😅
31b0be0
to
64fd757
Compare
All the request changes should be addressed. I moved the tests to HTTPMultiBin. Except this:
@imiric The URL parse it's executed only on dialling that most of the time should be only one time (let me know if I'm wrong). The current solution makes the code very simple, I tried some changes but they would introduce more complexity. So, considering the gain here I would prefer to keep the code simple. WDYT? |
cloudapi/logs.go
Outdated
@@ -166,6 +173,18 @@ func (c *Config) StreamLogsToLogger( | |||
if err != nil { | |||
logger.WithError(err).Warn("error reading a message from the cloud") | |||
|
|||
// try to restore the stream establishing a new connection | |||
logger.Warn("trying to establish a fresh connection with the tail logs, this might result in either some repeated or missed messages") //nolint:lll |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could remove or rephrase it, because with the current solution some repeated or missed messages
shouldn't really happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not combine both warnings? something like this
logger.WithError(err).Warn("error reading a log message from the cloud, trying to establish a fresh connection with the logs service...")
cloudapi/logs.go
Outdated
// try to restore the stream establishing a new connection | ||
logger.Warn("trying to establish a fresh connection with the tail logs, this might result in either some repeated or missed messages") //nolint:lll | ||
|
||
newconn, errd := c.logtailConn(ctx, referenceID, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given how time can drift due to network latency and be different between the client and the server, we probably should use the time of the last received log message (+1ms
to avoid repetition) here. We can only fall back to time.Now()
if we haven't received any log messages so far
Updated, I'm not so happy with the integration tests' complexity. I will try to polish for simplifying them when the core solution will be approved. |
cloudapi/logs.go
Outdated
func (ms msgStreams) LatestTimestamp() (ts int64) { | ||
if len(ms.Values) < 1 { | ||
return | ||
} | ||
for i := 0; i < len(ms.Values); i++ { | ||
raw := ms.Values[i][0] | ||
unix, err := strconv.ParseInt(raw, 10, 64) | ||
if err != nil { | ||
return | ||
} | ||
if unix > ts { | ||
ts = unix | ||
} | ||
} | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loki should always (per definition) retunr the logs in order so in our case if I remember correctly the last message is always the last message and as such with the latest timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For any single log stream, logs must always be sent in increasing time order. If a log is received with a timestamp older than the most recent log received for that stream, that log will be dropped.
Apparently, you're right. In the case, the client (k6) is sending it out of order then loki will drop it. So we can safely consider the last item in the slice as the latest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not only that but tail specifically returns them ordered as otherwise logging them will be pretty bad ;) .
Not that we currently interleave log messages from different streams when we logged them on the screen .... as I've just noticed 🤦
cloudapi/logs.go
Outdated
var ts int64 | ||
for _, stream := range m.Streams { | ||
sts := stream.LatestTimestamp() | ||
if sts > ts { | ||
ts = sts | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue you technically need to look at the droppedEntries as well, but I don't think it matters all that much as that basically means we got a lot of logs and we loki couldn't send them all (or we couldn't read fast enough) and so even if we look at them and choose a different timestamp we will still probably be dropping a lot of logs, so I think just checking the Streams is fine for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I consider adding a comment regarding it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, let's add a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at it now, we already go through all the streams and dropped iterations in the m.Log
above so maybe just return the latestTimestamp
from there ?
cloudapi/logs.go
Outdated
|
||
newconn, errd := c.logtailConn(ctx, referenceID, latest.TimeOrNow()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue that now the whole readMessage, send on channel, set latest timestamp based on it is racy (logically if not programmatically given the inclusion of atomics now).
I doubt it will make a difference where we get messages fast enough that we can't log them until we get the next one and then a third one is an error, so maybe not do anything, but add a comment saying that this non exact correctness is on purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the comment but I would like to have a 100% correct solution but I didn't yet find one that can maintain the code simple as-is. If you have suggestions they are welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a minor comment.
Nice work on the tests, cloudapi/logs.go
is 90% covered 👍 Not sure how we could simplify them, it's readable as is I think.
Updated with the suggested request changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some nitpicks that can be fixed later, but it looks ready either way, good job 👏 .
I kind of wonder if the 0, 5s,25s,2m isn't a bit too much of a backing off, but hopefully nobody will be hitting this ... that hard ;)
I still have to test it locally, somehow though so if anybody has some good suggestions , I am welcoming them ;)
cloudapi/logs.go
Outdated
var ts int64 | ||
for _, stream := range m.Streams { | ||
sts := stream.LatestTimestamp() | ||
if sts > ts { | ||
ts = sts | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at it now, we already go through all the streams and dropped iterations in the m.Log
above so maybe just return the latestTimestamp
from there ?
cloudapi/logs.go
Outdated
if errd == nil { | ||
mconn.Lock() | ||
conn = newconn | ||
mconn.Unlock() | ||
continue | ||
} | ||
|
||
// return the main error | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would've done errd!=nil
return err
and than skip the continue
but also keep the good case with smaller identation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then skip the continue
I reversed the check having the classic != nil
but I'm not sure I got how you would change to avoid the continue.
cloudapi/logs.go
Outdated
if ts < 1 { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of feel this isn't needed ... if we are logging you have a ts
bigger than 1 ;)
cloudapi/logs.go
Outdated
t = time.Now() | ||
if ts := atomic.LoadInt64(&tst.ts); ts > 0 { | ||
// add 1ms for avoid possible repetition | ||
t = time.Unix(0, ts).Add(1 * time.Millisecond) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I feel like this will be better of inline in the place it's used.
Currently, the whole timestampTrack is a 1 int64 that we atomically change under the hood and here you have some business logic on top. Given that it is literally set in 1 place and read in 1 place I would prefer if the business logic is more inlined and it's just that int64 that we atomically read and write, instead of it being something that I now need to go read that TImeOrNow
(which I guess should be ValueOrTImeNow
?) also adds 1 ms in one the cases.
p.s. also probably better to add 1 nanosecond just in case 🤣 . BUt I still feel this is better of in the place we are using it.
New changes are ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code LGTM, though I haven't tested this in practice, plan to do it after we merge in master
.
It makes the log tail connection more fault-tolerant: * it tries to acquire a new connection in case of error. * the dial operation retries few times with exponential backoff before return a final error.
It adds some capabilities to reconnect to our log tail service.
Some questions/notes:
cloudapi
package the_easyjson
file gets some changesgomock
as a dependency to support mocking some interfaces for unit testing. I find it generally very useful but we are not using it so I expect some cons with k6 that I'm not considering.gorilla/websocket
, if I'm in the right way with this PR, we could consider designing a better abstraction from the real streaming implementation.Closes #1966