Skip to content

Commit

Permalink
Fix maxConnectionPoolSize verification (#1216)
Browse files Browse the repository at this point in the history
* Fix `maxConnectionPoolSize` verification

Under high load of sessions, the connection pool is allowing new
connections to be created.
This is happening because, while the development of `AuthTokenManager`
and `connection liveness check`,
the methods do validate connection on acquired and return to the pool
need to be async.
This changes creates a situation of racing condition which doesn't exist
the original code and this introduces this bug.

Increasing the resouce acquired count before validating the connection
solves the issue, since the race condition is removed.
However, the pool needs also to release the resource when the validation
return `false` or fails for some reason.
This is important to avoid broken connection still be count for the max
pool size.

Co-Authored-By: Max Gustafsson <max.gustafsson@neo4j.com>

* Adjusting test for be more resilient

The test were to tight and any slowness might cause the test to fail.
Increase the time waiting for the connection get release to pool reduces
the likehood of this problem happens.

---------

Co-authored-by: Max Gustafsson <max.gustafsson@neo4j.com>
  • Loading branch information
bigmontz and MaxAake authored Sep 3, 2024
1 parent c517fa9 commit 720e5c6
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 5 deletions.
17 changes: 15 additions & 2 deletions packages/core/src/internal/pool/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,32 @@ class Pool<R extends unknown = unknown> {
continue
}

resourceAcquired(key, this._activeResourceCounts)

if (this._removeIdleObserver != null) {
this._removeIdleObserver(resource)
}

if (await this._validateOnAcquire(acquisitionContext, resource)) {
let valid = false

try {
valid = await this._validateOnAcquire(acquisitionContext, resource)
} catch (e) {
if (this._log.isErrorEnabled()) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
this._log.error(`Failure on validate ${resource}. This is a bug, please report it. Caused by: ${e.message}`)
}
}

if (valid) {
// idle resource is valid and can be acquired
resourceAcquired(key, this._activeResourceCounts)
if (this._log.isDebugEnabled()) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
this._log.debug(`${resource} acquired from the pool ${key}`)
}
return { resource, pool }
} else {
resourceReleased(key, this._activeResourceCounts)
pool.removeInUse(resource)
await this._destroy(resource)
}
Expand Down
135 changes: 134 additions & 1 deletion packages/core/test/internal/pool/pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,139 @@ describe('#unit Pool', () => {
expect(conns.length).toEqual(1)
})

it('should count connection on validation process when eval max pool size', async () => {
const conns: any[] = []
const pool = new Pool<any>({
// Hook into connection creation to track when and what connections that are
// created.
create: async (_, server, release) => {
// Create a fake connection that makes it possible control when it's connected
// and released from the outer scope.
const conn: any = {
server,
release
}
conns.push(conn)
return conn
},
validateOnAcquire: async (context, resource: any) => {
const promise = new Promise<boolean>((resolve, reject) => {
if (resource.promises == null) {
resource.promises = []
}
resource.promises.push({
resolve,
reject
})
})

return await promise
},
// Setup pool to only allow one connection
config: new PoolConfig(1, 100000)
})

// Make the first request for a connection, this will return a connection instantaneously
const conn0 = await pool.acquire({}, address)
expect(conns.length).toEqual(1)

// Releasing connection back to the pool, so it can be re-acquired.
await conn0.release(address, conn0)

// Request the same connection again, it will wait until resolve get called.
const req0 = pool.acquire({}, address)
expect(conns.length).toEqual(1)

// Request other connection, this should also resolve the same connection1.
const req1 = pool.acquire({}, address)
expect(conns.length).toEqual(1)

// connection 1 is valid
conns[0].promises[0].resolve(true)

// getting the connection 1
const conn1 = await req0
expect(conn0).toBe(conn1)
await conn1.release(address, conn1)

// connection 2 is valid
conns[0].promises[1].resolve(true)

// getting the connection 2
const conn2 = await req1
expect(conn0).toBe(conn2)
await conn2.release(address, conn2)
})

it.each([
['is not valid', (promise: any) => promise.resolve(false)],
['validation fails', (promise: any) => promise.reject(new Error('failed'))]
])('should create new connection if the current one when %s', async (_, resolver) => {
const conns: any[] = []
const pool = new Pool<any>({
// Hook into connection creation to track when and what connections that are
// created.
create: async (_, server, release) => {
// Create a fake connection that makes it possible control when it's connected
// and released from the outer scope.
const conn: any = {
server,
release
}
conns.push(conn)
return conn
},
validateOnAcquire: async (context, resource: any) => {
const promise = new Promise<boolean>((resolve, reject) => {
if (resource.promises == null) {
resource.promises = []
}
resource.promises.push({
resolve,
reject
})
})

return await promise
},
// Setup pool to only allow one connection
config: new PoolConfig(1, 100000)
})

// Make the first request for a connection, this will return a connection instantaneously
const conn0 = await pool.acquire({}, address)
expect(conns.length).toEqual(1)

// Releasing connection back to the pool, so it can be re-acquired.
await conn0.release(address, conn0)

// Request the same connection again, it will wait until resolve get called.
const req0 = pool.acquire({}, address)
expect(conns.length).toEqual(1)

// Request other connection, this should also resolve the same connection2.
const req1 = pool.acquire({}, address)
expect(conns.length).toEqual(1)

// should resolve the promise with the configured value
resolver(conns[0].promises[0])

// getting the connection 1
const conn1 = await req0
expect(conn0).not.toBe(conn1)
await conn1.release(address, conn1)
expect(conns.length).toEqual(2)

// connection 2 is valid
conns[1].promises[0].resolve(true)

// getting the connection 2
const conn2 = await req1
expect(conn1).toBe(conn2)
await conn2.release(address, conn2)
expect(conns.length).toEqual(2)
})

it('should not time out if max pool size is not set', async () => {
let counter = 0

Expand Down Expand Up @@ -1157,7 +1290,7 @@ describe('#unit Pool', () => {
)

const numberOfIdleResourceAfterResourceGetCreated = await new Promise(resolve =>
setTimeout(() => resolve(idleResources(pool, address)), 11))
setTimeout(() => resolve(idleResources(pool, address)), 15))

expect(numberOfIdleResourceAfterResourceGetCreated).toEqual(1)
expect(counter).toEqual(1)
Expand Down
17 changes: 15 additions & 2 deletions packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,32 @@ class Pool<R extends unknown = unknown> {
continue
}

resourceAcquired(key, this._activeResourceCounts)

if (this._removeIdleObserver != null) {
this._removeIdleObserver(resource)
}

if (await this._validateOnAcquire(acquisitionContext, resource)) {
let valid = false

try {
valid = await this._validateOnAcquire(acquisitionContext, resource)
} catch (e) {
if (this._log.isErrorEnabled()) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
this._log.error(`Failure on validate ${resource}. This is a bug, please report it. Caused by: ${e.message}`)
}
}

if (valid) {
// idle resource is valid and can be acquired
resourceAcquired(key, this._activeResourceCounts)
if (this._log.isDebugEnabled()) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
this._log.debug(`${resource} acquired from the pool ${key}`)
}
return { resource, pool }
} else {
resourceReleased(key, this._activeResourceCounts)
pool.removeInUse(resource)
await this._destroy(resource)
}
Expand Down

0 comments on commit 720e5c6

Please sign in to comment.