Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Read timeout for readbuffer #68

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions readbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,28 @@ import (
"errors"
"fmt"
"io"
"os"
"sync"
"time"

"github.com/sirupsen/logrus"
)

const (
MaxBuffer = 1 << 21
MaxBuffer = 1 << 21
DefaultReadTimeout = 1 * time.Minute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, two things that came to mind.

1. Long running requests

Correct me if I'm wrong, but I believe the remotedialer is used for clients to be able to WATCH kubernetes resources.

In that case, what happens if there are no changes in a minute? Do we / the UI send PING frames (cursory look, seems to be the keepalive mechanism in http2 land) to keep the readbuffer opened?

2. Big downloads

For regular http 1.1 requests downloading a big file, I don't remember if the receiver sends data or waits for the whole thing. In case it waits for the whole thing without sending any data back, the readbuffer would be closed after 1 minute which would close the two pipes and break the connection. If that's the case, do we know about such big downloads within rancher/rancher that would be broken with this new read timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go in reverse order.

Big downloads

This should be OK IIUC. http client sends request headers to the http server, which in this case is the remotedialer server, which it pipes them to the remotedialer client, which reads them via readbuffer. This does not last long, and after it's done it may timeout without consequences.

Of course the response is sent in the opposite direction, with the http server getting the downloaded bytes from a readbuffer - but here data continously flows so it will not time out. Of course it will time out if data stalls for more than the timeout, which is the whole point anyway.

You can try this out via:

./server/server -debug -id first -token aaa -listen :8121
./client/client -debug -id foo -connect ws://localhost:8121/connect
python3 -m http.server 8125 --bind 127.0.0.1 --directory ~/Downloads/

curl --limit-rate 1K --output /dev/null -v http://localhost:8121/client/foo/http/127.0.0.1:8125/big_file.zip

The limit-rate slows down transfer to make the download last longer

Long running requests
remotedialer is used for clients to be able to WATCH kubernetes resources.

That is correct, and WATCH does use long-running requests (long polling).

Do we / the UI send PING frames (cursory look, seems to be the keepalive mechanism in http2 land) to keep the readbuffer opened?

Bad news: if there are no changes before the timeout the connection will time out and be closed as you predicted. http 2 PING frames won't help here, because the readbuffer timeout is reset only when actual message payloads are delivered - and PINGs carry no data. And by the way, http 1.1 has nothing like that at all - it's delegated to TCP keepalive (which makes it equally invisible to the upper protocols).

This causes for example kubectl get --wait to exit:

SMOIOLI@suse-mbp ~ % time kubectl get pods --watch --all-namespaces
[...]
0.11s user 0.04s system 0% cpu 1:00.22 total

Good news: kubectl watch times out anyway for the same reason, just with longer time (30 minutes). IIUC clients of the WATCH API should be prepared to reconnect anytime - client-go does that. Also, the Rancher UI automatically reconnects on dropped WebSockets (which Rancher serves out of WATCH calls). So, to the best of my knowledge and testing, this isn't really a problem for general Rancher navigation, shouldn't generally be a problem for direct API usage (but could expose new bugs in clients) and it's probably a nuisance for kubectl watch interactive users.

One thing I can do quickly is to elongate the timeout to a more reasonable value (I will admit that 1 minute was chosen for to make test results very evident, I do not have good reason to keep it that low).

Alternative ideas are also welcome. My goal is to try not to make this intervention too invasive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the problem resides on the server never receiving the close notification because the client failed and gave up sending it, but moved on closing resources on its side, hence causing the server's list of active connection to differ from the client's.

Something that came to my mind was to extend the protocol to add a new message type to communicate the active connections IDs, so that the other end can figure out which connections are no longer needed and close them. This could also be implemented using the payload of ping control messages (according to RFC, it can contain application data, but Pong responses' should then match this payload), by setting a custom PingHandler, although it does not necessarily needs to be sent on every ping.

This idea would still compatible with a longer idle timeout, in case users at the client side also fails to close the connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the problem resides on the server never receiving the close notification because the client failed and gave up sending it, but moved on closing resources on its side, hence causing the server's list of active connection to differ from the client's.

So from the PR description, the issue was located in the Read() call where it was blocked and preventing the goroutine to be removed. This being blocked can happen in two places:

  1. In the pipe in Peer B if data is tunneled between peers. If that's the case then the issue is that Peer B stopped receiving data from Peer A. (This means OnData() is not called to fill up the readbuffer).
  2. In the pipe in Client A. If that's the case then the issue is that Client A stopped receiving data from Peer B (which could mean Peer B is also not receiving data from Peer A fwiw).

Rancher-Remote Dialer drawio

Now what you're believing is that the server (eg: Peer B) never received a close notification from the client (eg: Client A). I'm struggling to see how that would cause the Read() call to be blocked. Can you elaborate on that?

Copy link
Contributor Author

@moio moio Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now what you’re believing is that the server (eg: Peer B) never received a close notification from the client (eg: Client A). I’m struggling to see how that would cause the Read() call to be blocked. Can you elaborate on that?

Sure. What we see from flame graphs is an accumulation of goroutines stuck on cond.Wait() inside of Read(). So there’s too many of them, and they grow without limit until OOM. We are observing this phenomenon on the local cluster while piping from/to peers, so this is case 1. in your description.

Now because of how sync.Cond works once a remotedialer goroutine gets into a sync.Cond.Wait(), the only way to get out of it is some other goroutine calling sync.Cond.Broadcast() or sync.Cond.Signal(). In the case of readbuffer that only happens in three cases:

  1. on Offer(), after at least one new byte has been copied from the source into the readbuffer,
  2. on Read(), after at least one new byte has been read from the readbuffer and is ready to be returned to the caller. That BTW can only happen after an Offer(), or
  3. on Close()

For those accumulating goroutines:

  • We know that 1. does not happen - otherwise we would not see a growing count of goroutines stuck in cond.Wait()
  • We know 2. does not happen either - Read() code is actually stuck later, in cond.Wait()
  • Therefore the only reasonable case is that 3. also never happens. That is also indirectly confirmed by the fact we gave the customer a patched image with the changes in the current version of this PR, which force close after a certain amount of time, and that stopped the leaking.

Now the question changes to: why is readbuffer.Close() never called?

IIUC readbuffer.Close() is called only by doTunnelClose(), which in turn is called:

  1. when a pipe is done - but pipe is stuck in Read() so that will never happen in this condition, or
  2. when tunnelClose() is called, which happens when:
    2.1. the whole Session is closed, which is clearly not always the case - downstream clusters and peers can of course be up for weeks or more
    2.2. there’s errors during dialing - not the case here, connection is established fine, problem happens later
    2.3. closeConnection is called, which happens when:
    2.3.1. there is an error while sending a connect message - not the case here, messages go through without problems
    2.3.2. there is an error receiving data - not the case here
    2.3.3. connection.Close is called
    2.3.4. an explicit Error message is received

Further considerations are needed for the last two points.

2.3.3 is interesting because remotedialer.connection implements net.Conn and it’s returned by Dial outside of remotedialer code. Therefore users of remotedialer.connection can and should, of course, call Close when they are done, and that will end up closing the readbuffer as well. But in this case the server (local cluster) can’t know that the client has abruptly disconnected without some notification from the peer, so it might very well never call Close - for what it knows, the connection might still have work to do. So that’s my understanding as to why it does not happen either.

2.3.4 is error handling, and it includes the special case when the “error” is actually EOF, which is the way clients/peers notify servers that they are done with a connection. IOW: a graceful connection close mechanism. Now of course remotedialer code should be robust to non-graceful closes as well - EOF messages from clients/peers might never be delivered to servers for a variety of network reasons - if nothing else because all net.Conn writes have a configurable timeout that's outside of remotedialer control. Moreover, in case of failure, graceful closing is not retried.

So what we are saying is that the various close mechanisms in place all seem to have failed, in particular the delivery of messages for graceful connection close cannot be relied upon in all cases. Therefore we need something more or something else to avoid leaks.

The original version of this PR added a Read timeout as a catch-all measure which of course worked, but might create regressions for very long and inactive, but otherwise legitimate, connections. We propose to have peers periodically advertising their known client lists as a potential alternative.

Does that clarify?

Copy link
Contributor

@tomleb tomleb Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that clarify?

Yes, thanks for the explanation. I ended up with a similar list of possible path leading to a blocked Read() some weeks ago, but yesterday I got stuck thinking about only OnData() and forgot the other cases that would also unblock Read().

I think the solution suggested make sense. A timeout is needed to detect ungraceful connection termination.

Just this is not enough though because we may end up closing connection too soon if we're not receiving any data. So we need to differentiate between a timeout due to a connection terminated on the other end vs no data coming in for other reasons (eg: waiting on some response, etc). This differentiation can be done by introducing some type of "keepalive" mechanism that specifies that the connection is still active.

We could do this:

  1. Per connection (per Session). There would be one message per session every X seconds telling the other end that this session is still alive. OR
  2. As suggested by @aruiz14 , one message per peer every X seconds telling the other peer all the sessions that are still alive. I would be against this if every Session from two peers had a different TCP/http connection, because then you might run into the case of the "keepalive" connection messages being dropped while the actual data connection is fine. So connection could be dropped for no reason. Looking at the code, it seems like the custom dialer we use simply piggyback on the same connection, so that's not an issue here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure that we can reliably detect which sessions are alive from browser -> first peer, otherwise we're just moving the problem. Do we already have a mechanism to reliably detect that a browser has closed the connection (in an ungraceful termination case)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomleb sorry I missed your last comment. I assume that we rely on the HTTP/TCP to detect the websocket connection died: Session.Serve will fail obtaining NextReader and return, making the caller call Session.Close, which will eventually close all the multiplexed connections. The problem here is when the client closes one of those multiplexed connections without the other end noticing.

I've submitted this PR, please take a look when you have the chance, thanks!
#74

)

var ErrReadTimeoutExceeded = errors.New("read timeout exceeded")

type readBuffer struct {
id, readCount, offerCount int64
cond sync.Cond
deadline time.Time
buf bytes.Buffer
err error
backPressure *backPressure
readTimeout time.Duration
}

func newReadBuffer(id int64, backPressure *backPressure) *readBuffer {
Expand All @@ -31,6 +36,7 @@ func newReadBuffer(id int64, backPressure *backPressure) *readBuffer {
cond: sync.Cond{
L: &sync.Mutex{},
},
readTimeout: DefaultReadTimeout,
}
}

Expand Down Expand Up @@ -100,13 +106,15 @@ func (r *readBuffer) Read(b []byte) (int, error) {
now := time.Now()
if !r.deadline.IsZero() {
if now.After(r.deadline) {
return 0, errors.New("deadline exceeded")
return 0, fmt.Errorf("remotedialer readbuffer Read deadline exceeded: %w", os.ErrDeadlineExceeded)
}
}

var t *time.Timer
if !r.deadline.IsZero() {
t = time.AfterFunc(r.deadline.Sub(now), func() { r.cond.Broadcast() })
} else {
t = time.AfterFunc(r.readTimeout, func() { r.Close(ErrReadTimeoutExceeded) })
}
r.cond.Wait()
if t != nil {
Expand Down
102 changes: 102 additions & 0 deletions readbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package remotedialer

import (
"bytes"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_readBuffer_Read(t *testing.T) {
type args struct {
b []byte
}
tests := []struct {
name string
args args
deadline time.Duration
timeout time.Duration
offer []byte
want int
wantErr assert.ErrorAssertionFunc
}{
{
name: "Read",
args: args{
b: make([]byte, 10),
},
offer: []byte("test"),
want: 4,
wantErr: assert.NoError,
},
{
name: "Read with timeout",
args: args{
b: make([]byte, 10),
},
timeout: 100 * time.Millisecond,
want: 0,
wantErr: func(t assert.TestingT, err error, msgAndArgs ...interface{}) bool {
return assert.ErrorIs(t, err, ErrReadTimeoutExceeded)
},
},
{
name: "Read with deadline",
args: args{
b: make([]byte, 10),
},
deadline: 100 * time.Millisecond,
offer: nil,
want: 0,
wantErr: func(t assert.TestingT, err error, msgAndArgs ...interface{}) bool {
return assert.ErrorIs(t, err, os.ErrDeadlineExceeded)
},
},
{
name: "Read with timeout and deadline",
args: args{
b: make([]byte, 10),
},
deadline: 50 * time.Millisecond,
offer: nil,
want: 0,
wantErr: func(t assert.TestingT, err error, msgAndArgs ...interface{}) bool {
return assert.ErrorIs(t, err, os.ErrDeadlineExceeded)
},
},
}

t.Parallel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &readBuffer{
id: 1,
backPressure: newBackPressure(nil),
cond: sync.Cond{
L: &sync.Mutex{},
},
}
if tt.deadline > 0 {
r.deadline = time.Now().Add(tt.deadline)
}
if tt.timeout > 0 {
r.readTimeout = tt.timeout
}
if tt.offer != nil {
err := r.Offer(bytes.NewReader(tt.offer))
if err != nil {
t.Errorf("Offer() returned error: %v", err)
}
}
got, err := r.Read(tt.args.b)
if !tt.wantErr(t, err, fmt.Sprintf("Read(%v)", tt.args.b)) {
return
}
assert.Equalf(t, tt.want, got, "Read(%v)", tt.args.b)
})
}
}