Skip to content

Commit

Permalink
Remove read timeout log from critical section
Browse files Browse the repository at this point in the history
  • Loading branch information
fbiville committed Jan 27, 2023
1 parent 23b666f commit d463827
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 64 deletions.
3 changes: 0 additions & 3 deletions neo4j/internal/bolt/bolt3.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ func NewBolt3(serverName string, conn net.Conn, logger log.Logger, boltLog log.B
boltMajor: 3,
},
connReadTimeout: -1,
logger: logger,
logName: log.Bolt3,
},
birthDate: now,
idleDate: now,
Expand Down Expand Up @@ -207,7 +205,6 @@ func (b *bolt3) Connect(ctx context.Context, minor int, auth map[string]any, use
b.connId = succ.connectionId
connectionLogId := fmt.Sprintf("%s@%s", b.connId, b.serverName)
b.logId = connectionLogId
b.in.logId = connectionLogId
b.in.hyd.logId = connectionLogId
b.out.logId = connectionLogId
b.serverVersion = succ.server
Expand Down
3 changes: 1 addition & 2 deletions neo4j/internal/bolt/bolt3server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package bolt
import (
"context"
"fmt"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"io"
"net"
"testing"
Expand Down Expand Up @@ -93,7 +92,7 @@ func (s *bolt3server) waitForHello() {
}

func (s *bolt3server) receiveMsg() *testStruct {
_, buf, err := dechunkMessage(context.Background(), s.conn, []byte{}, -1, log.Void{}, "", "")
_, buf, err := dechunkMessage(context.Background(), s.conn, []byte{}, -1)
if err != nil {
panic(err)
}
Expand Down
3 changes: 0 additions & 3 deletions neo4j/internal/bolt/bolt4.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ func NewBolt4(serverName string, conn net.Conn, logger log.Logger, boltLog log.B
boltMajor: 4,
},
connReadTimeout: -1,
logger: logger,
logName: log.Bolt4,
},
lastQid: -1,
}
Expand Down Expand Up @@ -276,7 +274,6 @@ func (b *bolt4) Connect(ctx context.Context, minor int, auth map[string]any, use
connectionLogId := fmt.Sprintf("%s@%s", b.connId, b.serverName)
b.logId = connectionLogId
b.in.hyd.logId = connectionLogId
b.in.logId = connectionLogId
b.out.logId = connectionLogId

b.initializeReadTimeoutHint(succ.configurationHints)
Expand Down
3 changes: 1 addition & 2 deletions neo4j/internal/bolt/bolt4server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package bolt
import (
"context"
"fmt"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"io"
"net"
"reflect"
Expand Down Expand Up @@ -106,7 +105,7 @@ func (s *bolt4server) waitForHello() map[string]any {
}

func (s *bolt4server) receiveMsg() *testStruct {
_, buf, err := dechunkMessage(context.Background(), s.conn, []byte{}, -1, log.Void{}, "", "")
_, buf, err := dechunkMessage(context.Background(), s.conn, []byte{}, -1)
if err != nil {
panic(err)
}
Expand Down
3 changes: 0 additions & 3 deletions neo4j/internal/bolt/bolt5.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ func NewBolt5(serverName string, conn net.Conn, logger log.Logger, boltLog log.B
useUtc: true,
},
connReadTimeout: -1,
logger: logger,
logName: log.Bolt5,
},
lastQid: -1,
}
Expand Down Expand Up @@ -265,7 +263,6 @@ func (b *bolt5) Connect(ctx context.Context, minor int, auth map[string]any, use
connectionLogId := fmt.Sprintf("%s@%s", b.connId, b.serverName)
b.logId = connectionLogId
b.in.hyd.logId = connectionLogId
b.in.logId = connectionLogId
b.out.logId = connectionLogId

b.initializeReadTimeoutHint(succ.configurationHints)
Expand Down
3 changes: 1 addition & 2 deletions neo4j/internal/bolt/bolt5server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package bolt
import (
"context"
"fmt"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"io"
"net"
"testing"
Expand Down Expand Up @@ -96,7 +95,7 @@ func (s *bolt5server) waitForHello() map[string]any {
}

func (s *bolt5server) receiveMsg() *testStruct {
_, buf, err := dechunkMessage(context.Background(), s.conn, []byte{}, -1, log.Void{}, "", "")
_, buf, err := dechunkMessage(context.Background(), s.conn, []byte{}, -1)
if err != nil {
panic(err)
}
Expand Down
3 changes: 1 addition & 2 deletions neo4j/internal/bolt/chunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package bolt
import (
"bytes"
"context"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"net"
"testing"

Expand Down Expand Up @@ -83,7 +82,7 @@ func TestChunker(ot *testing.T) {

receiveAndAssertMessage := func(t *testing.T, conn net.Conn, expected []byte) {
t.Helper()
_, msg, err := dechunkMessage(context.Background(), conn, []byte{}, -1, log.Void{}, "", "")
_, msg, err := dechunkMessage(context.Background(), conn, []byte{}, -1)
AssertNoError(t, err)
assertSlices(t, msg, expected)
}
Expand Down
38 changes: 5 additions & 33 deletions neo4j/internal/bolt/dechunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"context"
"encoding/binary"
rio "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/racing"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"net"
"time"
)
Expand All @@ -34,22 +33,15 @@ import (
// Reads will race against the provided context ctx
// If the server provides the connection read timeout hint readTimeout, a new context will be created from that timeout
// and the user-provided context ctx before every read
func dechunkMessage(
ctx context.Context,
conn net.Conn,
msgBuf []byte,
readTimeout time.Duration,
logger log.Logger,
logName string,
logId string) ([]byte, []byte, error) {
func dechunkMessage(ctx context.Context, conn net.Conn, msgBuf []byte, readTimeout time.Duration) ([]byte, []byte, error) {

sizeBuf := []byte{0x00, 0x00}
off := 0

reader := rio.NewRacingReader(conn)

for {
updatedCtx, cancelFunc := newContext(ctx, readTimeout, logger, logName, logId)
updatedCtx, cancelFunc := newContext(ctx, readTimeout)
_, err := reader.ReadFull(updatedCtx, sizeBuf)
if err != nil {
return msgBuf, nil, processReadError(err, ctx, readTimeout)
Expand All @@ -73,7 +65,7 @@ func dechunkMessage(
msgBuf = newMsgBuf
}
// Read the chunk into buffer
updatedCtx, cancelFunc = newContext(ctx, readTimeout, logger, logName, logId)
updatedCtx, cancelFunc = newContext(ctx, readTimeout)
_, err = reader.ReadFull(updatedCtx, msgBuf[off:(off+chunkSize)])
if err != nil {
return msgBuf, nil, processReadError(err, ctx, readTimeout)
Expand All @@ -86,21 +78,9 @@ func dechunkMessage(
}

// newContext computes a new context and cancel function if a readTimeout is set
func newContext(
ctx context.Context,
readTimeout time.Duration,
logger log.Logger,
logName string,
logId string) (context.Context, context.CancelFunc) {

func newContext(ctx context.Context, readTimeout time.Duration) (context.Context, context.CancelFunc) {
if readTimeout >= 0 {
newCtx, cancelFunc := context.WithTimeout(ctx, readTimeout)
logger.Debugf(logName, logId,
"read timeout of %s applied, chunk read deadline is now: %s",
readTimeout.String(),
deadlineOf(newCtx),
)
return newCtx, cancelFunc
return context.WithTimeout(ctx, readTimeout)
}
return ctx, nil
}
Expand All @@ -120,11 +100,3 @@ func processReadError(err error, ctx context.Context, readTimeout time.Duration)
}
return err
}

func deadlineOf(ctx context.Context) string {
deadline, hasDeadline := ctx.Deadline()
if !hasDeadline {
return "N/A (no deadline set)"
}
return deadline.String()
}
9 changes: 4 additions & 5 deletions neo4j/internal/bolt/dechunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"bytes"
"context"
"encoding/binary"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"net"
"reflect"
"testing"
Expand Down Expand Up @@ -83,7 +82,7 @@ func TestDechunker(t *testing.T) {
go func() {
AssertWriteSucceeds(t, cli, str.Bytes())
}()
buf, msgBuf, err = dechunkMessage(context.Background(), serv, buf, -1, log.Void{}, "", "")
buf, msgBuf, err = dechunkMessage(context.Background(), serv, buf, -1)
AssertNoError(t, err)
AssertLen(t, msgBuf, int(msg.size))
// Check content of buffer
Expand Down Expand Up @@ -123,7 +122,7 @@ func TestDechunkerWithTimeout(ot *testing.T) {
AssertWriteSucceeds(t, cli, []byte{0x00, 0x00})
}()
buffer := make([]byte, 2)
_, _, err := dechunkMessage(context.Background(), serv, buffer, timeout, log.Void{}, "", "")
_, _, err := dechunkMessage(context.Background(), serv, buffer, timeout)
AssertNoError(t, err)
AssertTrue(t, reflect.DeepEqual(buffer, []byte{0xCA, 0xFE}))
})
Expand All @@ -132,7 +131,7 @@ func TestDechunkerWithTimeout(ot *testing.T) {
serv, cli := net.Pipe()
defer closePipe(ot, serv, cli)

_, _, err := dechunkMessage(context.Background(), serv, nil, timeout, log.Void{}, "", "")
_, _, err := dechunkMessage(context.Background(), serv, nil, timeout)

AssertError(t, err)
AssertStringContain(t, err.Error(), "context deadline exceeded")
Expand All @@ -144,7 +143,7 @@ func TestDechunkerWithTimeout(ot *testing.T) {
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()

_, _, err := dechunkMessage(ctx, serv, nil, -1, log.Void{}, "", "")
_, _, err := dechunkMessage(ctx, serv, nil, -1)

AssertError(t, err)
AssertStringContain(t, err.Error(), "context deadline exceeded")
Expand Down
3 changes: 1 addition & 2 deletions neo4j/internal/bolt/hydratedehydrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package bolt

import (
"context"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"net"
"testing"
"time"
Expand Down Expand Up @@ -58,7 +57,7 @@ func TestDehydrateHydrate(ot *testing.T) {
go func() {
out.send(context.Background(), cli)
}()
_, byts, err := dechunkMessage(context.Background(), serv, []byte{}, -1, log.Void{}, "", "")
_, byts, err := dechunkMessage(context.Background(), serv, []byte{}, -1)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 1 addition & 5 deletions neo4j/internal/bolt/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package bolt

import (
"context"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"net"
"time"
)
Expand All @@ -30,16 +29,13 @@ type incoming struct {
buf []byte // Reused buffer
hyd hydrator
connReadTimeout time.Duration
logger log.Logger
logName string
logId string
}

func (i *incoming) next(ctx context.Context, rd net.Conn) (any, error) {
// Get next message from transport layer
var err error
var msg []byte
i.buf, msg, err = dechunkMessage(ctx, rd, i.buf, i.connReadTimeout, i.logger, i.logName, i.logId)
i.buf, msg, err = dechunkMessage(ctx, rd, i.buf, i.connReadTimeout)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions neo4j/internal/bolt/outgoing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package bolt
import (
"context"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"net"
"reflect"
"testing"
Expand Down Expand Up @@ -112,7 +111,7 @@ func TestOutgoing(ot *testing.T) {
}()

// Dechunk it
_, byts, err := dechunkMessage(context.Background(), serv, []byte{}, -1, log.Void{}, "", "")
_, byts, err := dechunkMessage(context.Background(), serv, []byte{}, -1)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit d463827

Please sign in to comment.