Skip to content

Add transaction with retries API #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions gulpfile.babel.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,17 @@ gulp.task('test', function(cb){

gulp.task('test-nodejs', ['install-driver-into-sandbox'], function () {
return gulp.src('test/**/*.test.js')
.pipe(jasmine({
// reporter: new reporters.JUnitXmlReporter({
// savePath: "build/nodejs-test-reports",
// consolidateAll: false
// }),
includeStackTrace: true
}));
.pipe(jasmine({
includeStackTrace: true,
verbose: true
}));
});

gulp.task('test-boltkit', ['nodejs'], function () {
return gulp.src('test/**/*.boltkit.it.js')
.pipe(jasmine({
// reporter: new reporters.JUnitXmlReporter({
// savePath: "build/nodejs-test-reports",
// consolidateAll: false
// }),
includeStackTrace: true
includeStackTrace: true,
verbose: true
}));
});

Expand Down
6 changes: 3 additions & 3 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class Driver {
*/
session(mode, bookmark) {
const sessionMode = Driver._validateSessionMode(mode);
return this._createSession(sessionMode, this._connectionProvider, bookmark);
return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config);
}

static _validateSessionMode(rawMode) {
Expand All @@ -132,8 +132,8 @@ class Driver {
}

//Extension point
_createSession(mode, connectionProvider, bookmark) {
return new Session(mode, connectionProvider, bookmark);
_createSession(mode, connectionProvider, bookmark, config) {
return new Session(mode, connectionProvider, bookmark, config);
}

_driverOnErrorCallback(error) {
Expand Down
10 changes: 10 additions & 0 deletions src/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
* // port, and this is then used to verify the host certificate does not change.
* // This setting has no effect unless TRUST_ON_FIRST_USE is enabled.
* knownHosts:"~/.neo4j/known_hosts",
*
* // The max number of connections that are allowed idle in the pool at any time.
* // Connection will be destroyed if this threshold is exceeded.
* connectionPoolSize: 50,
*
* // Specify the maximum time in milliseconds transactions are allowed to retry via
* // {@link Session#readTransaction()} and {@link Session#writeTransaction()} functions. These functions
* // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with
* // exponential backoff using initial delay of 1 second. Default value is 30000 which is 30 seconds.
* maxTransactionRetryTime: 30000,
* }
*
* @param {string} url The URL for the Neo4j database, for instance "bolt://localhost"
Expand Down
149 changes: 149 additions & 0 deletions src/v1/internal/transaction-executor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Copyright (c) 2002-2017 "Neo Technology,","
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';

const DEFAULT_MAX_RETRY_TIME_MS = 30 * 1000; // 30 seconds
const DEFAULT_INITIAL_RETRY_DELAY_MS = 1000; // 1 seconds
const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0;
const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2;

export default class TransactionExecutor {

constructor(maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor) {
this._maxRetryTimeMs = _valueOrDefault(maxRetryTimeMs, DEFAULT_MAX_RETRY_TIME_MS);
this._initialRetryDelayMs = _valueOrDefault(initialRetryDelayMs, DEFAULT_INITIAL_RETRY_DELAY_MS);
this._multiplier = _valueOrDefault(multiplier, DEFAULT_RETRY_DELAY_MULTIPLIER);
this._jitterFactor = _valueOrDefault(jitterFactor, DEFAULT_RETRY_DELAY_JITTER_FACTOR);

this._inFlightTimeoutIds = [];

this._verifyAfterConstruction();
}

execute(transactionCreator, transactionWork) {
return new Promise((resolve, reject) => {
this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject);
}).catch(error => {
const retryStartTimeMs = Date.now();
const retryDelayMs = this._initialRetryDelayMs;
return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTimeMs, retryDelayMs);
});
}

close() {
// cancel all existing timeouts to prevent further retries
this._inFlightTimeoutIds.forEach(timeoutId => clearTimeout(timeoutId));
this._inFlightTimeoutIds = [];
}

_retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, retryDelayMs) {
const elapsedTimeMs = Date.now() - retryStartTime;

if (elapsedTimeMs > this._maxRetryTimeMs || !TransactionExecutor._canRetryOn(error)) {
return Promise.reject(error);
}

return new Promise((resolve, reject) => {
const nextRetryTime = this._computeDelayWithJitter(retryDelayMs);
const timeoutId = setTimeout(() => {
// filter out this timeoutId when time has come and function is being executed
this._inFlightTimeoutIds = this._inFlightTimeoutIds.filter(id => id !== timeoutId);
this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject);
}, nextRetryTime);
// add newly created timeoutId to the list of all in-flight timeouts
this._inFlightTimeoutIds.push(timeoutId);
}).catch(error => {
const nextRetryDelayMs = retryDelayMs * this._multiplier;
return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, nextRetryDelayMs);
});
}

_executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject) {
try {
const tx = transactionCreator();
const transactionWorkResult = transactionWork(tx);

// user defined callback is supposed to return a promise, but it might not; so to protect against an
// incorrect API usage we wrap the returned value with a resolved promise; this is effectively a
// validation step without type checks
const resultPromise = Promise.resolve(transactionWorkResult);

resultPromise.then(result => {
if (tx.isOpen()) {
// transaction work returned resolved promise and transaction has not been committed/rolled back
// try to commit the transaction
tx.commit().then(() => {
// transaction was committed, return result to the user
resolve(result);
}).catch(error => {
// transaction failed to commit, propagate the failure
reject(error);
});
} else {
// transaction work returned resolved promise and transaction is already committed/rolled back
// return the result returned by given transaction work
resolve(result);
}
}).catch(error => {
// transaction work returned rejected promise, propagate the failure
reject(error);
});

} catch (error) {
reject(error);
}
}

_computeDelayWithJitter(delayMs) {
const jitter = (delayMs * this._jitterFactor);
const min = delayMs - jitter;
const max = delayMs + jitter;
return Math.random() * (max - min) + min;
}

static _canRetryOn(error) {
return error && error.code &&
(error.code === SERVICE_UNAVAILABLE ||
error.code === SESSION_EXPIRED ||
error.code.indexOf('TransientError') >= 0);
}

_verifyAfterConstruction() {
if (this._maxRetryTimeMs < 0) {
throw newError('Max retry time should be >= 0: ' + this._maxRetryTimeMs);
}
if (this._initialRetryDelayMs < 0) {
throw newError('Initial retry delay should >= 0: ' + this._initialRetryDelayMs);
}
if (this._multiplier < 1.0) {
throw newError('Multiplier should be >= 1.0: ' + this._multiplier);
}
if (this._jitterFactor < 0 || this._jitterFactor > 1) {
throw newError('Jitter factor should be in [0.0, 1.0]: ' + this._jitterFactor);
}
}
};

function _valueOrDefault(value, defaultValue) {
if (value || value === 0) {
return value;
}
return defaultValue;
}
8 changes: 4 additions & 4 deletions src/v1/routing-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class RoutingDriver extends Driver {
return new LoadBalancer(address, connectionPool, driverOnErrorCallback);
}

_createSession(mode, connectionProvider, bookmark) {
return new RoutingSession(mode, connectionProvider, bookmark, (error, conn) => {
_createSession(mode, connectionProvider, bookmark, config) {
return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => {
if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) {
// connection is undefined if error happened before connection was acquired
if (conn) {
Expand Down Expand Up @@ -66,8 +66,8 @@ class RoutingDriver extends Driver {
}

class RoutingSession extends Session {
constructor(mode, connectionProvider, bookmark, onFailedConnection) {
super(mode, connectionProvider, bookmark);
constructor(mode, connectionProvider, bookmark, config, onFailedConnection) {
super(mode, connectionProvider, bookmark, config);
this._onFailedConnection = onFailedConnection;
}

Expand Down
76 changes: 67 additions & 9 deletions src/v1/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import Transaction from './transaction';
import {newError} from './error';
import {assertString} from './internal/util';
import ConnectionHolder from './internal/connection-holder';
import {READ, WRITE} from './driver';
import Driver, {READ, WRITE} from './driver';
import TransactionExecutor from './internal/transaction-executor';

/**
* A Session instance is used for handling the connection and
Expand All @@ -36,15 +37,17 @@ class Session {
* @constructor
* @param {string} mode the default access mode for this session.
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
* @param {string} bookmark - the initial bookmark for this session.
* @param {string} [bookmark=undefined] - the initial bookmark for this session.
* @param {Object} [config={}] - this driver configuration.
*/
constructor(mode, connectionProvider, bookmark) {
constructor(mode, connectionProvider, bookmark, config) {
this._mode = mode;
this._readConnectionHolder = new ConnectionHolder(READ, connectionProvider);
this._writeConnectionHolder = new ConnectionHolder(WRITE, connectionProvider);
this._open = true;
this._hasTx = false;
this._lastBookmark = bookmark;
this._transactionExecutor = _createTransactionExecutor(config);
}

/**
Expand Down Expand Up @@ -92,32 +95,81 @@ class Session {
* @returns {Transaction} - New Transaction
*/
beginTransaction(bookmark) {
return this._beginTransaction(this._mode, bookmark);
}

_beginTransaction(accessMode, bookmark) {
if (bookmark) {
assertString(bookmark, 'Bookmark');
this._updateBookmark(bookmark);
}

if (this._hasTx) {
throw newError("You cannot begin a transaction on a session with an "
+ "open transaction; either run from within the transaction or use a "
+ "different session.")
throw newError('You cannot begin a transaction on a session with an open transaction; ' +
'either run from within the transaction or use a different session.');
}

this._hasTx = true;

const connectionHolder = this._connectionHolderWithMode(this._mode);
const mode = Driver._validateSessionMode(accessMode);
const connectionHolder = this._connectionHolderWithMode(mode);
connectionHolder.initializeConnection();
this._hasTx = true;

return new Transaction(connectionHolder, () => {
this._hasTx = false;
},
this._onRunFailure(), this._lastBookmark, this._updateBookmark.bind(this));
}

/**
* Return the bookmark received following the last completed {@link Transaction}.
*
* @return a reference to a previous transac'tion
*/
lastBookmark() {
return this._lastBookmark;
}

/**
* Execute given unit of work in a {@link Driver#READ} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* <code>maxTransactionRetryTime</code> property in milliseconds.
*
* @param {function(Transaction)} transactionWork - callback that executes operations against
* a given {@link Transaction}.
* @return {Promise} resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
*/
readTransaction(transactionWork) {
return this._runTransaction(READ, transactionWork);
}

/**
* Execute given unit of work in a {@link Driver#WRITE} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* <code>maxTransactionRetryTime</code> property in milliseconds.
*
* @param {function(Transaction)} transactionWork - callback that executes operations against
* a given {@link Transaction}.
* @return {Promise} resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
*/
writeTransaction(transactionWork) {
return this._runTransaction(WRITE, transactionWork);
}

_runTransaction(accessMode, transactionWork) {
return this._transactionExecutor.execute(
() => this._beginTransaction(accessMode, this.lastBookmark()),
transactionWork
);
}

_updateBookmark(newBookmark) {
if (newBookmark) {
this._lastBookmark = newBookmark;
Expand All @@ -132,6 +184,7 @@ class Session {
close(callback = (() => null)) {
if (this._open) {
this._open = false;
this._transactionExecutor.close();
this._readConnectionHolder.close().then(() => {
this._writeConnectionHolder.close().then(() => {
callback();
Expand Down Expand Up @@ -180,4 +233,9 @@ class _RunObserver extends StreamObserver {
}
}

function _createTransactionExecutor(config) {
const maxRetryTimeMs = (config && config.maxTransactionRetryTime) ? config.maxTransactionRetryTime : null;
return new TransactionExecutor(maxRetryTimeMs);
}

export default Session;
Loading