Skip to content

Commit

Permalink
Buffered reading. (#581)
Browse files Browse the repository at this point in the history
* add bufio in between conn and racing reader

* receiver name correction

* fixing tests

* adding ReadBufferSize configuration

* racing reader over client not server

* Refactor

* move `DefaultReadBufferSize` to `bolt/connections`
* wrap reader in buffered reader before passing it into the bolt implementations

* Revert re-using channel inside RacingReader

Avoiding potential race condition where message from previous call could linger
in the channel.

* Code clean-up

---------

Co-authored-by: Stephen Cathcart <stephen.m.cathcart@gmail.com>
Co-authored-by: Robsdedude <rouven.bauer@neo4j.com>
  • Loading branch information
3 people authored Jul 2, 2024
1 parent a9185d9 commit 14cbf69
Show file tree
Hide file tree
Showing 19 changed files with 90 additions and 28 deletions.
2 changes: 2 additions & 0 deletions neo4j/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package neo4j

import (
"github.com/neo4j/neo4j-go-driver/v5/neo4j/config"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/bolt"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/pool"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/notifications"
"math"
Expand Down Expand Up @@ -51,6 +52,7 @@ func defaultConfig() *Config {
NotificationsMinSeverity: notifications.DefaultLevel,
NotificationsDisabledCategories: notifications.NotificationDisabledCategories{},
TelemetryDisabled: false,
ReadBufferSize: bolt.DefaultReadBufferSize,
}
}

Expand Down
5 changes: 5 additions & 0 deletions neo4j/config/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ type Config struct {
//
// default: true
TelemetryDisabled bool
// ReadBufferSize defines the size of the buffer used for reading data from the network connection.
// A larger buffer size can improve performance by reducing the number of read operations required
// for large data transfers. Currently, the default value is 8 KiB, but may change in the future.
// Set to 0 or below to disable buffering.
ReadBufferSize int
}

// ServerAddressResolver is a function type that defines the resolver function used by the routing driver to
Expand Down
6 changes: 3 additions & 3 deletions neo4j/internal/bolt/bolt3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry"
itime "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/time"
"net"
"io"
"reflect"
"time"

Expand Down Expand Up @@ -79,7 +79,7 @@ type bolt3 struct {
state int
txId idb.TxHandle
currStream *stream
conn net.Conn
conn io.ReadWriteCloser
serverName string
out *outgoing
in *incoming
Expand All @@ -100,7 +100,7 @@ type bolt3 struct {

func NewBolt3(
serverName string,
conn net.Conn,
conn io.ReadWriteCloser,
errorListener ConnectionErrorListener,
logger log.Logger,
boltLog log.BoltLogger,
Expand Down
2 changes: 2 additions & 0 deletions neo4j/internal/bolt/bolt3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestBolt3(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -167,6 +168,7 @@ func TestBolt3(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNil(t, bolt)
AssertError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions neo4j/internal/bolt/bolt3server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package bolt

import (
"bufio"
"context"
"fmt"
"io"
"net"
"testing"

"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/packstream"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/racing"
)

// Fake of bolt3 server.
Expand All @@ -35,6 +37,7 @@ type bolt3server struct {
conn net.Conn
unpacker *packstream.Unpacker
out *outgoing
reader racing.RacingReader
}

func newBolt3Server(conn net.Conn) *bolt3server {
Expand All @@ -45,6 +48,7 @@ func newBolt3Server(conn net.Conn) *bolt3server {
chunker: newChunker(),
packer: packstream.Packer{},
},
reader: racing.NewRacingReader(bufio.NewReaderSize(conn, DefaultReadBufferSize)),
}
}

Expand Down
6 changes: 3 additions & 3 deletions neo4j/internal/bolt/bolt4.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"errors"
"fmt"
"net"
"io"
"reflect"
"time"

Expand Down Expand Up @@ -94,7 +94,7 @@ type bolt4 struct {
state int
txId idb.TxHandle
streams openstreams
conn net.Conn
conn io.ReadWriteCloser
serverName string
connId string
logId string
Expand All @@ -116,7 +116,7 @@ type bolt4 struct {

func NewBolt4(
serverName string,
conn net.Conn,
conn io.ReadWriteCloser,
errorListener ConnectionErrorListener,
logger log.Logger,
boltLog log.BoltLogger,
Expand Down
5 changes: 5 additions & 0 deletions neo4j/internal/bolt/bolt4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func TestBolt4(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -230,6 +231,7 @@ func TestBolt4(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNoError(t, err)
bolt.Close(context.Background())
Expand Down Expand Up @@ -259,6 +261,7 @@ func TestBolt4(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNoError(t, err)
bolt.Close(context.Background())
Expand Down Expand Up @@ -289,6 +292,7 @@ func TestBolt4(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNoError(t, err)
bolt.Close(context.Background())
Expand All @@ -315,6 +319,7 @@ func TestBolt4(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNil(t, bolt)
AssertError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions neo4j/internal/bolt/bolt5.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"errors"
"fmt"
"net"
"io"
"reflect"
"time"

Expand Down Expand Up @@ -96,7 +96,7 @@ type bolt5 struct {
state int
txId idb.TxHandle
streams openstreams
conn net.Conn
conn io.ReadWriteCloser
serverName string
queue messageQueue
connId string
Expand All @@ -119,7 +119,7 @@ type bolt5 struct {

func NewBolt5(
serverName string,
conn net.Conn,
conn io.ReadWriteCloser,
errorListener ConnectionErrorListener,
logger log.Logger,
boltLog log.BoltLogger,
Expand Down
9 changes: 9 additions & 0 deletions neo4j/internal/bolt/bolt5_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestBolt5(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -304,6 +305,7 @@ func TestBolt5(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNoError(t, err)
bolt.Close(context.Background())
Expand Down Expand Up @@ -336,6 +338,7 @@ func TestBolt5(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNoError(t, err)
bolt.Close(context.Background())
Expand Down Expand Up @@ -365,6 +368,7 @@ func TestBolt5(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNoError(t, err)
bolt.Close(context.Background())
Expand Down Expand Up @@ -396,6 +400,7 @@ func TestBolt5(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNoError(t, err)
bolt.Close(context.Background())
Expand All @@ -422,6 +427,7 @@ func TestBolt5(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNil(t, bolt)
AssertError(t, err)
Expand Down Expand Up @@ -457,6 +463,7 @@ func TestBolt5(outer *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertNil(t, bolt)
AssertError(t, err)
Expand Down Expand Up @@ -1771,6 +1778,7 @@ func TestBolt5(outer *testing.T) {
logger,
&boltLogger,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -1847,6 +1855,7 @@ func TestBolt5(outer *testing.T) {
logger,
&boltLogger,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
if err != nil {
t.Error(err)
Expand Down
9 changes: 6 additions & 3 deletions neo4j/internal/bolt/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func Connect(ctx context.Context,
logger log.Logger,
boltLogger log.BoltLogger,
notificationConfig db.NotificationConfig,
readBufferSize int,
) (db.Connection, error) {
// Perform Bolt handshake to negotiate version
// Send handshake to server
Expand Down Expand Up @@ -89,14 +90,16 @@ func Connect(ctx context.Context,

major := buf[3]
minor := buf[2]

bufferedConn := bufferedConnection(conn, readBufferSize)
var boltConn db.Connection
switch major {
case 3:
boltConn = NewBolt3(serverName, conn, errorListener, logger, boltLogger)
boltConn = NewBolt3(serverName, bufferedConn, errorListener, logger, boltLogger)
case 4:
boltConn = NewBolt4(serverName, conn, errorListener, logger, boltLogger)
boltConn = NewBolt4(serverName, bufferedConn, errorListener, logger, boltLogger)
case 5:
boltConn = NewBolt5(serverName, conn, errorListener, logger, boltLogger)
boltConn = NewBolt5(serverName, bufferedConn, errorListener, logger, boltLogger)
case 0:
return nil, fmt.Errorf("server did not accept any of the requested Bolt versions (%#v)", versions)
default:
Expand Down
2 changes: 2 additions & 0 deletions neo4j/internal/bolt/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestConnect(ot *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertError(t, err)
})
Expand All @@ -90,6 +91,7 @@ func TestConnect(ot *testing.T) {
logger,
nil,
idb.NotificationConfig{},
DefaultReadBufferSize,
)
AssertError(t, err)
if boltconn != nil {
Expand Down
29 changes: 27 additions & 2 deletions neo4j/internal/bolt/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,45 @@
package bolt

import (
"bufio"
"context"
"io"
"net"

"github.com/neo4j/neo4j-go-driver/v5/neo4j/db"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"net"
)

// DefaultReadBufferSize specifies the default size (in bytes) of the buffer used for reading data from the network connection.
const DefaultReadBufferSize = 8192

func bufferedConnection(conn net.Conn, readBufferSize int) io.ReadWriteCloser {
var reader io.Reader
if readBufferSize > 0 {
reader = bufio.NewReaderSize(conn, readBufferSize)
} else {
reader = conn
}

return struct {
io.Reader
io.Writer
io.Closer
}{
Reader: reader,
Writer: conn,
Closer: conn,
}
}

type ConnectionErrorListener interface {
OnNeo4jError(context.Context, idb.Connection, *db.Neo4jError) error
OnIoError(context.Context, idb.Connection, error)
OnDialError(context.Context, string, error)
}

func handleTerminatedContextError(err error, connection net.Conn) error {
func handleTerminatedContextError(err error, connection io.Closer) error {
if !contextTerminatedErr(err) {
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions neo4j/internal/bolt/dechunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package bolt
import (
"context"
"encoding/binary"
"io"
"time"

"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
rio "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/racing"
"net"
"time"
)

// dechunkMessage takes a buffer to be reused and returns the reusable buffer
Expand All @@ -32,7 +33,7 @@ import (
// Reads will race against the provided context ctx
// If the server provides the connection read timeout hint readTimeout, a new context will be created from that timeout
// and the user-provided context ctx before every read
func dechunkMessage(ctx context.Context, conn net.Conn, msgBuf []byte, readTimeout time.Duration) ([]byte, []byte, error) {
func dechunkMessage(ctx context.Context, conn io.Reader, msgBuf []byte, readTimeout time.Duration) ([]byte, []byte, error) {

sizeBuf := []byte{0x00, 0x00}
off := 0
Expand Down
Loading

0 comments on commit 14cbf69

Please sign in to comment.