forked from neo4j/neo4j-go-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Stuck with session error 'Already in tx'
When an error happens when committing or rolling back a transaction the session get 'stuck' in transaction mode which means that the session couldn't be used for further work. fix neo4j#130
Peter Wilhelmsson
committed
Aug 4, 2020
1 parent
b626aa9
commit ee1f1ac
Showing
4 changed files
with
270 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
/* | ||
* Copyright (c) 2002-2020 "Neo4j," | ||
* Neo4j Sweden AB [http://neo4j.com] | ||
* | ||
* This file is part of Neo4j. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package neo4j | ||
|
||
import ( | ||
"context" | ||
"reflect" | ||
"testing" | ||
"time" | ||
|
||
"github.com/neo4j/neo4j-go-driver/neo4j/internal/db" | ||
"github.com/neo4j/neo4j-go-driver/neo4j/internal/pool" | ||
) | ||
|
||
func assertErrorEq(t *testing.T, err1, err2 error) { | ||
if !reflect.DeepEqual(err1, err2) { | ||
t.Errorf("Wrong type of error, '%s' != '%s'", err1, err2) | ||
} | ||
} | ||
|
||
// Fake implementation of router | ||
type testRouter struct { | ||
readers []string | ||
writers []string | ||
err error | ||
} | ||
|
||
func (r *testRouter) Readers(database string) ([]string, error) { | ||
return r.readers, r.err | ||
} | ||
|
||
func (r *testRouter) Writers(database string) ([]string, error) { | ||
return r.writers, r.err | ||
} | ||
|
||
func (r *testRouter) Invalidate(database string) { | ||
} | ||
|
||
func (r *testRouter) CleanUp() { | ||
} | ||
|
||
// Fake implementation of connection pool | ||
type testPool struct { | ||
borrowConn pool.Connection | ||
err error | ||
returnHook func() | ||
} | ||
|
||
func (p *testPool) Borrow(ctx context.Context, serverNames []string, wait bool) (pool.Connection, error) { | ||
return p.borrowConn, p.err | ||
} | ||
|
||
func (p *testPool) Return(c pool.Connection) { | ||
if p.returnHook != nil { | ||
p.returnHook() | ||
} | ||
} | ||
|
||
func (p *testPool) CleanUp() { | ||
} | ||
|
||
// Fake implementation of db connection | ||
type testConn struct { | ||
err error | ||
txBeginHandle db.Handle | ||
runStream *db.Stream | ||
runTxStream *db.Stream | ||
nextRecord *db.Record | ||
nextSummary *db.Summary | ||
bookmark string | ||
serverName string | ||
serverVersion string | ||
isAlive bool | ||
birthDate time.Time | ||
txCommitErr error | ||
txRollbackErr error | ||
resetHook func() | ||
} | ||
|
||
func (c *testConn) TxBegin(mode db.AccessMode, bookmarks []string, timeout time.Duration, meta map[string]interface{}) (db.Handle, error) { | ||
return c.txBeginHandle, c.err | ||
} | ||
|
||
func (c *testConn) TxRollback(tx db.Handle) error { | ||
return c.txRollbackErr | ||
} | ||
|
||
func (c *testConn) TxCommit(tx db.Handle) error { | ||
return c.txCommitErr | ||
} | ||
|
||
func (c *testConn) Run(cypher string, params map[string]interface{}, mode db.AccessMode, bookmarks []string, timeout time.Duration, meta map[string]interface{}) (*db.Stream, error) { | ||
return c.runStream, c.err | ||
} | ||
|
||
func (c *testConn) RunTx(tx db.Handle, cypher string, params map[string]interface{}) (*db.Stream, error) { | ||
return c.runTxStream, c.err | ||
} | ||
|
||
func (c *testConn) Next(streamHandle db.Handle) (*db.Record, *db.Summary, error) { | ||
return c.nextRecord, c.nextSummary, c.err | ||
} | ||
|
||
func (c *testConn) Bookmark() string { | ||
return c.bookmark | ||
} | ||
|
||
func (c *testConn) ServerName() string { | ||
return c.serverName | ||
} | ||
|
||
func (c *testConn) ServerVersion() string { | ||
return c.serverVersion | ||
} | ||
|
||
func (c *testConn) IsAlive() bool { | ||
return c.isAlive | ||
} | ||
|
||
func (c *testConn) Birthdate() time.Time { | ||
return c.birthDate | ||
} | ||
|
||
func (c *testConn) Reset() { | ||
if c.resetHook != nil { | ||
c.resetHook() | ||
} | ||
} | ||
|
||
func (c *testConn) Close() { | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Copyright (c) 2002-2020 "Neo4j," | ||
* Neo4j Sweden AB [http://neo4j.com] | ||
* | ||
* This file is part of Neo4j. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package neo4j | ||
|
||
import ( | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/neo4j/neo4j-go-driver/neo4j/internal/db" | ||
"github.com/neo4j/neo4j-go-driver/neo4j/internal/log" | ||
) | ||
|
||
func TestSession(st *testing.T) { | ||
logger := log.ConsoleLogger{Errors: true, Infos: true, Warns: false} | ||
|
||
assertCleanSessionState := func(t *testing.T, sess *session) { | ||
if sess.inTx { | ||
t.Errorf("Session should not be in tx mode") | ||
} | ||
} | ||
|
||
createSession := func() (*testRouter, *testPool, *session) { | ||
conf := Config{MaxTransactionRetryTime: 3 * time.Millisecond} | ||
router := testRouter{} | ||
pool := testPool{} | ||
sess := newSession(&conf, &router, &pool, db.ReadMode, []string{}, "", &logger) | ||
sess.throttleTime = time.Millisecond * 1 | ||
return &router, &pool, sess | ||
} | ||
|
||
st.Run("Retry mechanism", func(rt *testing.T) { | ||
// Checks that retries occur on database error and that it stops retrying after a certain | ||
// amount of time and that connections are returned to pool upon failure. | ||
rt.Run("Consistent transient error", func(t *testing.T) { | ||
_, pool, sess := createSession() | ||
numReturns := 0 | ||
pool.returnHook = func() { | ||
numReturns++ | ||
} | ||
conn := &testConn{isAlive: true} | ||
pool.borrowConn = conn | ||
transientErr := &db.DatabaseError{Code: "Neo.TransientError.General.MemoryPoolOutOfMemoryError"} | ||
numRetries := 0 | ||
|
||
_, err := sess.WriteTransaction(func(tx Transaction) (interface{}, error) { | ||
// Previous connection should be returned to pool since it failed | ||
if numRetries > 0 && numReturns != numRetries { | ||
t.Errorf("Should have returned previous connection to pool") | ||
} | ||
numRetries++ | ||
return nil, transientErr | ||
}) | ||
|
||
if numRetries < 2 { | ||
t.Errorf("Should have retried at least once but executed %d", numRetries) | ||
} | ||
assertErrorEq(t, transientErr, err) | ||
assertCleanSessionState(t, sess) | ||
}) | ||
|
||
// Checks that session is in clean state after connection fails to rollback. | ||
// "User" initiates rollback by letting the transaction function return a custom error. | ||
rt.Run("Failed rollback", func(t *testing.T) { | ||
_, pool, sess := createSession() | ||
rollbackErr := errors.New("Rollback error") | ||
causeOfRollbackErr := errors.New("Cause of rollback") | ||
pool.borrowConn = &testConn{isAlive: true, txRollbackErr: rollbackErr} | ||
_, err := sess.WriteTransaction(func(tx Transaction) (interface{}, error) { | ||
return nil, causeOfRollbackErr | ||
}) | ||
assertErrorEq(t, causeOfRollbackErr, err) | ||
assertCleanSessionState(t, sess) | ||
}) | ||
|
||
// Check that sesssion is in clean state after connection fails to commit. | ||
rt.Run("Failed commit", func(t *testing.T) { | ||
_, pool, sess := createSession() | ||
commitErr := errors.New("Commit error") | ||
pool.borrowConn = &testConn{isAlive: true, txCommitErr: commitErr} | ||
_, err := sess.WriteTransaction(func(tx Transaction) (interface{}, error) { | ||
return nil, nil | ||
}) | ||
assertErrorEq(t, commitErr, err) | ||
assertCleanSessionState(t, sess) | ||
}) | ||
}) | ||
} |