Skip to content

Commit

Permalink
The pool asynchronously removes connection slots when they are identi…
Browse files Browse the repository at this point in the history
…fied as removable. This create races, e.g a connection can be evicted from the pool and then acquired before the removal happens.

Connection slots are now removed synchronously when the pool identifies connections to be removed immediately within the pool logic. The post action removal are now enqueued in the main post action (when there is one), the combiner executor has been modified to take in account the fact that a post action might next actions to execute.
  • Loading branch information
vietj committed Aug 3, 2023
1 parent b2b520d commit fa45c5e
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ public void submit(Action<S> action) {
if (a == null) {
break;
}
Task task = a.execute(state);
final Task task = a.execute(state);
if (task != null) {
if (head == null) {
assert tail == null;
tail = task;
for (Task next = tail.next();next != null;next = tail.next()) {
tail = tail.next();
}
head = task;
} else {
tail = tail.next(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ public void submit(Action<S> action) {
post = action.execute(state);
} finally {
lock.unlock();
if (post != null) {
while (post != null) {
post.run();
post = post.next();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,7 @@ public Task execute(SimpleConnectionPool<C> pool) {
} else {
waiter.disposed = true;
}
if (!pool.closed) {
pool.remove(removed);
}
return new Task() {
Task task = new Task() {
@Override
public void run() {
if (waiter != null) {
Expand All @@ -386,6 +383,14 @@ public void run() {
removed.result.fail(cause);
}
};
if (!pool.closed) {
Task removeTask = new Remove<>(removed).execute(pool);
if (removeTask != null) {
removeTask.next(task);
task = removeTask;
}
}
return task;
}
}

Expand Down Expand Up @@ -522,15 +527,21 @@ public void run() {
res.add(slot.connection);
}
}
for (Slot<C> slot : removed) {
pool.remove(slot);
}
return new Task() {
Task head = new Task() {
@Override
public void run() {
handler.handle(Future.succeededFuture(res));
}
};
Task tail = head;
for (Slot<C> slot : removed) {
Task next = new Remove<>(slot).execute(pool);
if (next != null) {
tail.next(next);
tail = next;
}
}
return head;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/vertx/core/net/impl/pool/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public Task replaceNext(Task next) {
return oldNext;
}

public Task next() {
return next;
}

public Task next(Task next) {
this.next = next;
return next;
Expand Down
34 changes: 34 additions & 0 deletions src/test/java/io/vertx/core/net/impl/pool/ConnectionPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,40 @@ public void testRemoveEvicted() throws Exception {
assertEquals(0, pool.size());
}

@Test
public void testSynchronousEviction() throws Exception {
ConnectionManager mgr = new ConnectionManager();
ConnectionPool<Connection> pool = ConnectionPool.pool(mgr, new int[] { 1 }, 1);
EventLoopContext ctx = vertx.createEventLoopContext();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
pool.acquire(ctx, 0, onSuccess(lease -> {
lease.recycle();
latch1.countDown();
}));
ConnectionRequest request = mgr.assertRequest();
Connection conn1 = new Connection();
request.connect(conn1, 0);
awaitLatch(latch1);
Connection conn2 = new Connection();
pool.evict(candidate -> {
assertSame(candidate, conn1);
pool.acquire(ctx, 0, onSuccess(lease -> {
Connection c2 = lease.get();
assertSame(conn2, c2);
latch3.countDown();
}));
return true;
}, onSuccess(list -> {
latch2.countDown();
}));
awaitLatch(latch2);
request = mgr.assertRequest();
request.connect(conn2, 0);
awaitLatch(latch3);
}

@Test
public void testConnectionInProgressShouldNotBeEvicted() {
ConnectionManager mgr = new ConnectionManager();
Expand Down

0 comments on commit fa45c5e

Please sign in to comment.