Skip to content

Commit

Permalink
Remove unneeded bookmark DB tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
fbiville committed Nov 23, 2022
1 parent 64e7408 commit dc3ed9c
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 101 deletions.
4 changes: 2 additions & 2 deletions neo4j/bookmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ func (b *bookmarkManager) GetBookmarks(ctx context.Context) (Bookmarks, error) {
}
extraBookmarks = bookmarks
}
b.mutex.RLock()
defer b.mutex.RUnlock()
if len(b.bookmarks) == 0 {
return extraBookmarks, nil
}
b.mutex.RLock()
defer b.mutex.RUnlock()
bookmarks := b.bookmarks.Copy()
if extraBookmarks == nil {
return bookmarks.Values(), nil
Expand Down
7 changes: 2 additions & 5 deletions neo4j/internal/bolt/bolt3.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,11 +617,8 @@ func (b *bolt3) receiveNext(ctx context.Context) (*db.Record, *db.Summary, error
}
}

func (b *bolt3) Bookmark() (string, string) {
// the bookmark database is empty here since multi-tenancy (aka multi-database)
// is a Neo4j 4.x EE feature
// only Bolt 4.0+ connections may return a non-empty value
return b.bookmark, ""
func (b *bolt3) Bookmark() string {
return b.bookmark
}

func (b *bolt3) IsAlive() bool {
Expand Down
14 changes: 7 additions & 7 deletions neo4j/internal/bolt/bolt3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func TestBolt3(outer *testing.T) {

bolt.TxCommit(context.Background(), tx)
assertBoltState(t, bolt3_ready, bolt)
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, committedBookmark, bookmark)
})

Expand All @@ -226,7 +226,7 @@ func TestBolt3(outer *testing.T) {
assertBoltState(t, bolt3_streamingtx, bolt)
bolt.TxCommit(context.Background(), tx)
assertBoltState(t, bolt3_ready, bolt)
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, committedBookmark, bookmark)
})

Expand All @@ -243,7 +243,7 @@ func TestBolt3(outer *testing.T) {
idb.TxConfig{Mode: idb.ReadMode, Bookmarks: []string{"bm1"}})
assertBoltState(t, bolt3_failed, bolt)
AssertError(t, err)
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, "", bookmark)
})

Expand Down Expand Up @@ -386,7 +386,7 @@ func TestBolt3(outer *testing.T) {
err := bolt.Buffer(context.Background(), stream)
AssertNoError(t, err)
// The bookmark should be set
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, bookmark, runBookmark)

// Server closed connection and bolt will go into failed state
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestBolt3(outer *testing.T) {
rec, sum, err = bolt.Next(context.Background(), stream)
AssertNextOnlyError(t, rec, sum, err)
// Should be no bookmark since we failed
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, bookmark, "")
})

Expand Down Expand Up @@ -467,7 +467,7 @@ func TestBolt3(outer *testing.T) {
AssertNoError(t, err)
AssertNotNil(t, sum)
// The bookmark should be set
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, bookmark, runBookmark)
AssertStringEqual(t, sum.Bookmark, runBookmark)

Expand Down Expand Up @@ -503,7 +503,7 @@ func TestBolt3(outer *testing.T) {
sum, err := bolt.Consume(context.Background(), stream)
AssertNeo4jError(t, err)
AssertNil(t, sum)
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, bookmark, "")

// Should not get the summary since there was an error
Expand Down
50 changes: 21 additions & 29 deletions neo4j/internal/bolt/bolt4.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,25 @@ func (i *internalTx4) toMeta() map[string]any {
}

type bolt4 struct {
state int
txId idb.TxHandle
streams openstreams
conn net.Conn
serverName string
out outgoing
in incoming
connId string
logId string
serverVersion string
tfirst int64 // Time that server started streaming
bookmark string // Last bookmark
bookmarkDatabase string // Last bookmark's database
birthDate time.Time
log log.Logger
databaseName string
err error // Last fatal error
minor int
lastQid int64 // Last seen qid
idleDate time.Time
state int
txId idb.TxHandle
streams openstreams
conn net.Conn
serverName string
out outgoing
in incoming
connId string
logId string
serverVersion string
tfirst int64 // Time that server started streaming
bookmark string // Last bookmark
birthDate time.Time
log log.Logger
databaseName string
err error // Last fatal error
minor int
lastQid int64 // Last seen qid
idleDate time.Time
}

func NewBolt4(serverName string, conn net.Conn, logger log.Logger, boltLog log.BoltLogger) *bolt4 {
Expand Down Expand Up @@ -388,9 +387,6 @@ func (b *bolt4) TxCommit(ctx context.Context, txh idb.TxHandle) error {
// Keep track of bookmark
if len(succ.bookmark) > 0 {
b.bookmark = succ.bookmark
// SUCCESS can include the DB the bookmarks belong to when SSR is on
// and queries like `USE myDb MATCH (...)` are executed
b.bookmarkDatabase = succ.db
}

// Transition into ready state
Expand Down Expand Up @@ -805,9 +801,6 @@ func (b *bolt4) receiveNext(ctx context.Context) (*db.Record, bool, *db.Summary)
sum.TFirst = b.tfirst
if len(sum.Bookmark) > 0 {
b.bookmark = sum.Bookmark
// SUCCESS can include the DB the bookmarks belong to when SSR is on
// and queries like `USE myDb MATCH (...)` are executed
b.bookmarkDatabase = sum.Database
}
// Done with this stream
b.streams.detach(sum, nil)
Expand All @@ -823,8 +816,8 @@ func (b *bolt4) receiveNext(ctx context.Context) (*db.Record, bool, *db.Summary)
}
}

func (b *bolt4) Bookmark() (string, string) {
return b.bookmark, b.bookmarkDatabase
func (b *bolt4) Bookmark() string {
return b.bookmark
}

func (b *bolt4) IsAlive() bool {
Expand All @@ -848,7 +841,6 @@ func (b *bolt4) Reset(ctx context.Context) {
b.log.Debugf(log.Bolt4, b.logId, "Resetting connection internal state")
b.txId = 0
b.bookmark = ""
b.bookmarkDatabase = ""
b.databaseName = idb.DefaultDatabase
b.err = nil
b.lastQid = -1
Expand Down
18 changes: 9 additions & 9 deletions neo4j/internal/bolt/bolt4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func TestBolt4(outer *testing.T) {

bolt.TxCommit(context.Background(), tx)
assertBoltState(t, bolt4_ready, bolt)
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, committedBookmark, bookmark)
})

Expand Down Expand Up @@ -501,7 +501,7 @@ func TestBolt4(outer *testing.T) {
assertBoltState(t, bolt4_streamingtx, bolt)
bolt.TxCommit(context.Background(), tx)
assertBoltState(t, bolt4_ready, bolt)
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, committedBookmark, bookmark)
})

Expand All @@ -518,7 +518,7 @@ func TestBolt4(outer *testing.T) {
idb.TxConfig{Mode: idb.ReadMode, Bookmarks: []string{"bm1"}})
assertBoltState(t, bolt4_failed, bolt)
AssertError(t, err)
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, "", bookmark)
})

Expand Down Expand Up @@ -683,7 +683,7 @@ func TestBolt4(outer *testing.T) {
err := bolt.Buffer(context.Background(), stream)
AssertNoError(t, err)
// The bookmark should be set
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, bookmark, runBookmark)

// Server closed connection and bolt will go into failed state
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestBolt4(outer *testing.T) {
err = bolt.Buffer(context.Background(), stream)
AssertNoError(t, err)
// The bookmark should be set
bookmark, _ = bolt.Bookmark()
bookmark = bolt.Bookmark()
AssertStringEqual(t, bookmark, bookmark)

for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestBolt4(outer *testing.T) {
rec, sum, err = bolt.Next(context.Background(), stream)
AssertNextOnlyError(t, rec, sum, err)
// Should be no bookmark since we failed
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, bookmark, "")
})

Expand Down Expand Up @@ -810,7 +810,7 @@ func TestBolt4(outer *testing.T) {
AssertNotNil(t, sum)
assertBoltState(t, bolt4_ready, bolt)
// The bookmark should be set
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, bookmark, runBookmark)
AssertStringEqual(t, sum.Bookmark, runBookmark)

Expand Down Expand Up @@ -856,7 +856,7 @@ func TestBolt4(outer *testing.T) {
assertBoltState(t, bolt4_ready, bolt)

// The bookmark should be set
bookmark, _ = bolt.Bookmark()
bookmark = bolt.Bookmark()
AssertStringEqual(t, bookmark, bookmark)
AssertStringEqual(t, sum.Bookmark, bookmark)

Expand Down Expand Up @@ -892,7 +892,7 @@ func TestBolt4(outer *testing.T) {
sum, err := bolt.Consume(context.Background(), stream)
AssertNeo4jError(t, err)
AssertNil(t, sum)
bookmark, _ := bolt.Bookmark()
bookmark := bolt.Bookmark()
AssertStringEqual(t, bookmark, "")

// Should not get the summary since there was an error
Expand Down
50 changes: 21 additions & 29 deletions neo4j/internal/bolt/bolt5.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,25 @@ func (i *internalTx5) toMeta() map[string]any {
}

type bolt5 struct {
state int
txId idb.TxHandle
streams openstreams
conn net.Conn
serverName string
out outgoing
in incoming
connId string
logId string
serverVersion string
tfirst int64 // Time that server started streaming
bookmark string // Last bookmark
bookmarkDatabase string // Last bookmark's database
birthDate time.Time
log log.Logger
databaseName string
err error // Last fatal error
minor int
lastQid int64 // Last seen qid
idleDate time.Time
state int
txId idb.TxHandle
streams openstreams
conn net.Conn
serverName string
out outgoing
in incoming
connId string
logId string
serverVersion string
tfirst int64 // Time that server started streaming
bookmark string // Last bookmark
birthDate time.Time
log log.Logger
databaseName string
err error // Last fatal error
minor int
lastQid int64 // Last seen qid
idleDate time.Time
}

func NewBolt5(serverName string, conn net.Conn, logger log.Logger, boltLog log.BoltLogger) *bolt5 {
Expand Down Expand Up @@ -366,9 +365,6 @@ func (b *bolt5) TxCommit(ctx context.Context, txh idb.TxHandle) error {
// Keep track of bookmark
if len(succ.bookmark) > 0 {
b.bookmark = succ.bookmark
// SUCCESS can include the DB the bookmarks belong to when SSR is on
// and queries like `USE myDb MATCH (...)` are executed
b.bookmarkDatabase = succ.db
}

// Transition into ready state
Expand Down Expand Up @@ -778,9 +774,6 @@ func (b *bolt5) receiveNext(ctx context.Context) (*db.Record, bool, *db.Summary)
sum.TFirst = b.tfirst
if len(sum.Bookmark) > 0 {
b.bookmark = sum.Bookmark
// SUCCESS can include the DB the bookmarks belong to when SSR is on
// and queries like `USE myDb MATCH (...)` are executed
b.bookmarkDatabase = sum.Database
}
// Done with this stream
b.streams.detach(sum, nil)
Expand All @@ -796,8 +789,8 @@ func (b *bolt5) receiveNext(ctx context.Context) (*db.Record, bool, *db.Summary)
}
}

func (b *bolt5) Bookmark() (string, string) {
return b.bookmark, b.bookmarkDatabase
func (b *bolt5) Bookmark() string {
return b.bookmark
}

func (b *bolt5) IsAlive() bool {
Expand All @@ -821,7 +814,6 @@ func (b *bolt5) Reset(ctx context.Context) {
b.log.Debugf(log.Bolt5, b.logId, "Resetting connection internal state")
b.txId = 0
b.bookmark = ""
b.bookmarkDatabase = ""
b.databaseName = idb.DefaultDatabase
b.err = nil
b.lastQid = -1
Expand Down
Loading

0 comments on commit dc3ed9c

Please sign in to comment.