Skip to content

Commit

Permalink
Stuck with session error 'Already in tx'
Browse files Browse the repository at this point in the history
When an error happens when committing or rolling back a transaction the
session get 'stuck' in transaction mode which means that the session
couldn't be used for further work.

fix #130
  • Loading branch information
Peter Wilhelmsson authored and 2hdddg committed Aug 4, 2020
1 parent b626aa9 commit c8f9517
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 12 deletions.
6 changes: 3 additions & 3 deletions neo4j/internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func (p *Pool) getServers() map[string]*server {
}

// Prune all old connection on all the servers, this makes sure that servers
// gets removed from the map at some point in time (as long as someone
// borrows new connections). If there is a noticed failed connect still active
// we should wait a while with removal to get prioritization right.
// gets removed from the map at some point in time. If there is a noticed
// failed connect still active we should wait a while with removal to get
// prioritization right.
func (p *Pool) CleanUp() {
p.serversMut.Lock()
defer p.serversMut.Unlock()
Expand Down
148 changes: 148 additions & 0 deletions neo4j/no_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package neo4j

import (
"context"
"reflect"
"testing"
"time"

"github.com/neo4j/neo4j-go-driver/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/neo4j/internal/pool"
)

func assertErrorEq(t *testing.T, err1, err2 error) {
if !reflect.DeepEqual(err1, err2) {
t.Errorf("Wrong type of error, '%s' != '%s'", err1, err2)
}
}

// Fake implementation of router
type testRouter struct {
readers []string
writers []string
err error
}

func (r *testRouter) Readers(database string) ([]string, error) {
return r.readers, r.err
}

func (r *testRouter) Writers(database string) ([]string, error) {
return r.writers, r.err
}

func (r *testRouter) Invalidate(database string) {
}

func (r *testRouter) CleanUp() {
}

// Fake implementation of connection pool
type testPool struct {
borrowConn pool.Connection
err error
returnHook func()
}

func (p *testPool) Borrow(ctx context.Context, serverNames []string, wait bool) (pool.Connection, error) {
return p.borrowConn, p.err
}

func (p *testPool) Return(c pool.Connection) {
if p.returnHook != nil {
p.returnHook()
}
}

func (p *testPool) CleanUp() {
}

// Fake implementation of db connection
type testConn struct {
err error
txBeginHandle db.Handle
runStream *db.Stream
runTxStream *db.Stream
nextRecord *db.Record
nextSummary *db.Summary
bookmark string
serverName string
serverVersion string
isAlive bool
birthDate time.Time
txCommitErr error
txRollbackErr error
resetHook func()
}

func (c *testConn) TxBegin(mode db.AccessMode, bookmarks []string, timeout time.Duration, meta map[string]interface{}) (db.Handle, error) {
return c.txBeginHandle, c.err
}

func (c *testConn) TxRollback(tx db.Handle) error {
return c.txRollbackErr
}

func (c *testConn) TxCommit(tx db.Handle) error {
return c.txCommitErr
}

func (c *testConn) Run(cypher string, params map[string]interface{}, mode db.AccessMode, bookmarks []string, timeout time.Duration, meta map[string]interface{}) (*db.Stream, error) {
return c.runStream, c.err
}

func (c *testConn) RunTx(tx db.Handle, cypher string, params map[string]interface{}) (*db.Stream, error) {
return c.runTxStream, c.err
}

func (c *testConn) Next(streamHandle db.Handle) (*db.Record, *db.Summary, error) {
return c.nextRecord, c.nextSummary, c.err
}

func (c *testConn) Bookmark() string {
return c.bookmark
}

func (c *testConn) ServerName() string {
return c.serverName
}

func (c *testConn) ServerVersion() string {
return c.serverVersion
}

func (c *testConn) IsAlive() bool {
return c.isAlive
}

func (c *testConn) Birthdate() time.Time {
return c.birthDate
}

func (c *testConn) Reset() {
if c.resetHook != nil {
c.resetHook()
}
}

func (c *testConn) Close() {
}
23 changes: 14 additions & 9 deletions neo4j/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,19 @@ type SessionConfig struct {
DatabaseName string
}

// Connection pool as seen by the session.
type sessionPool interface {
Borrow(ctx context.Context, serverNames []string, wait bool) (pool.Connection, error)
Return(c pool.Connection)
CleanUp()
}

type session struct {
config *Config
defaultMode db.AccessMode
bookmarks []string
databaseName string
pool *pool.Pool
pool sessionPool
router sessionRouter
conn db.Connection
inTx bool
Expand All @@ -75,6 +82,7 @@ type session struct {
now func() time.Time
logId string
log log.Logger
throttleTime time.Duration
}

var sessionid uint32
Expand Down Expand Up @@ -103,7 +111,7 @@ func cleanupBookmarks(bookmarks []string) []string {
return cleaned
}

func newSession(config *Config, router sessionRouter, pool *pool.Pool,
func newSession(config *Config, router sessionRouter, pool sessionPool,
mode db.AccessMode, bookmarks []string, databaseName string, logger log.Logger) *session {

id := atomic.AddUint32(&sessionid, 1)
Expand All @@ -121,6 +129,7 @@ func newSession(config *Config, router sessionRouter, pool *pool.Pool,
now: time.Now,
log: logger,
logId: logId,
throttleTime: time.Second * 1,
}
}

Expand Down Expand Up @@ -215,16 +224,12 @@ func (s *session) beginTransaction(mode db.AccessMode, config *TransactionConfig
}

err := conn.TxCommit(txHandle)
if err == nil {
s.inTx = false
}
s.inTx = false
return err
},
rollback: func() error {
err := conn.TxRollback(txHandle)
if err == nil {
s.inTx = false
}
s.inTx = false
return err
},
}, nil
Expand Down Expand Up @@ -275,7 +280,7 @@ func (s *session) runRetriable(
var (
maxDeadErrors = s.config.MaxConnectionPoolSize / 2
maxClusterErrors = 1
throttle = throttler(1 * time.Second)
throttle = throttler(s.throttleTime)
start time.Time
)
for {
Expand Down
105 changes: 105 additions & 0 deletions neo4j/session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package neo4j

import (
"errors"
"testing"
"time"

"github.com/neo4j/neo4j-go-driver/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/neo4j/internal/log"
)

func TestSession(st *testing.T) {
logger := log.ConsoleLogger{Errors: true, Infos: true, Warns: false}

assertCleanSessionState := func(t *testing.T, sess *session) {
if sess.inTx {
t.Errorf("Session should not be in tx mode")
}
}

createSession := func() (*testRouter, *testPool, *session) {
conf := Config{MaxTransactionRetryTime: 3 * time.Millisecond}
router := testRouter{}
pool := testPool{}
sess := newSession(&conf, &router, &pool, db.ReadMode, []string{}, "", &logger)
sess.throttleTime = time.Millisecond * 1
return &router, &pool, sess
}

st.Run("Retry mechanism", func(rt *testing.T) {
// Checks that retries occur on database error and that it stops retrying after a certain
// amount of time and that connections are returned to pool upon failure.
rt.Run("Consistent transient error", func(t *testing.T) {
_, pool, sess := createSession()
numReturns := 0
pool.returnHook = func() {
numReturns++
}
conn := &testConn{isAlive: true}
pool.borrowConn = conn
transientErr := &db.DatabaseError{Code: "Neo.TransientError.General.MemoryPoolOutOfMemoryError"}
numRetries := 0

_, err := sess.WriteTransaction(func(tx Transaction) (interface{}, error) {
// Previous connection should be returned to pool since it failed
if numRetries > 0 && numReturns != numRetries {
t.Errorf("Should have returned previous connection to pool")
}
numRetries++
return nil, transientErr
})

if numRetries < 2 {
t.Errorf("Should have retried at least once but executed %d", numRetries)
}
assertErrorEq(t, transientErr, err)
assertCleanSessionState(t, sess)
})

// Checks that session is in clean state after connection fails to rollback.
// "User" initiates rollback by letting the transaction function return a custom error.
rt.Run("Failed rollback", func(t *testing.T) {
_, pool, sess := createSession()
rollbackErr := errors.New("Rollback error")
causeOfRollbackErr := errors.New("Cause of rollback")
pool.borrowConn = &testConn{isAlive: true, txRollbackErr: rollbackErr}
_, err := sess.WriteTransaction(func(tx Transaction) (interface{}, error) {
return nil, causeOfRollbackErr
})
assertErrorEq(t, causeOfRollbackErr, err)
assertCleanSessionState(t, sess)
})

// Check that sesssion is in clean state after connection fails to commit.
rt.Run("Failed commit", func(t *testing.T) {
_, pool, sess := createSession()
commitErr := errors.New("Commit error")
pool.borrowConn = &testConn{isAlive: true, txCommitErr: commitErr}
_, err := sess.WriteTransaction(func(tx Transaction) (interface{}, error) {
return nil, nil
})
assertErrorEq(t, commitErr, err)
assertCleanSessionState(t, sess)
})
})
}

0 comments on commit c8f9517

Please sign in to comment.