diff --git a/connection.go b/connection.go index bb28151..90af96e 100644 --- a/connection.go +++ b/connection.go @@ -29,7 +29,7 @@ func newConnection(connID int64, session *Session, proto, address string) *conne session: session, } c.backPressure = newBackPressure(c) - c.buffer = newReadBuffer(c.backPressure) + c.buffer = newReadBuffer(connID, c.backPressure) metrics.IncSMTotalAddConnectionsForWS(session.clientKey, proto, address) return c } diff --git a/readbuffer.go b/readbuffer.go index 9ce7fcb..60ffdcf 100644 --- a/readbuffer.go +++ b/readbuffer.go @@ -16,16 +16,17 @@ const ( ) type readBuffer struct { - readCount, offerCount int64 - cond sync.Cond - deadline time.Time - buf bytes.Buffer - err error - backPressure *backPressure + id, readCount, offerCount int64 + cond sync.Cond + deadline time.Time + buf bytes.Buffer + err error + backPressure *backPressure } -func newReadBuffer(backPressure *backPressure) *readBuffer { +func newReadBuffer(id int64, backPressure *backPressure) *readBuffer { return &readBuffer{ + id: id, backPressure: backPressure, cond: sync.Cond{ L: &sync.Mutex{}, @@ -60,7 +61,7 @@ func (r *readBuffer) Offer(reader io.Reader) error { } if r.buf.Len() > MaxBuffer*2 { - logrus.Errorf("remotedialer buffer exceeded, length: %d", r.buf.Len()) + logrus.Errorf("remotedialer buffer exceeded id=%d, length: %d", r.id, r.buf.Len()) } return nil @@ -87,6 +88,11 @@ func (r *readBuffer) Read(b []byte) (int, error) { return n, nil } + if r.buf.Cap() > MaxBuffer/8 { + logrus.Debugf("resetting remotedialer buffer id=%d to zero, old cap %d", r.id, r.buf.Cap()) + r.buf = bytes.Buffer{} + } + if r.err != nil { return 0, r.err }