Skip to content

Commit

Permalink
The CombinerExecutor class creates an instance of FastThreadLocal for…
Browse files Browse the repository at this point in the history
… each combiner executor leading to an increase of the InternalThreadLocalMap index. Consequently each thread local map of FastThreadLocalThread will get a new map sized accordingly, leading to an eventual memory leak.

Use a static FastThreadLocal in CombinerExecutor instead of a instance field, the data structure stored in this thread local map keeps track of the CombinerExecutor running in order to allow interleaved execution of CombinerExecutor post tasks without interfering each other. The structure is optimized for the most frequent case.
  • Loading branch information
vietj committed Jan 29, 2024
1 parent 32ae3bc commit dd6f643
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 22 deletions.
49 changes: 39 additions & 10 deletions src/main/java/io/vertx/core/net/impl/pool/CombinerExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -29,11 +31,19 @@ public class CombinerExecutor<S> implements Executor<S> {
private final AtomicInteger s = new AtomicInteger();
private final S state;

protected static final class InProgressTail {
protected static final class InProgressTail<S> {

final CombinerExecutor<S> combiner;
Task task;
Map<CombinerExecutor<S>, Task> others;

public InProgressTail(CombinerExecutor<S> combiner, Task task) {
this.combiner = combiner;
this.task = task;
}
}

private final FastThreadLocal<InProgressTail> current = new FastThreadLocal<>();
private static final FastThreadLocal<InProgressTail<?>> current = new FastThreadLocal<>();

public CombinerExecutor(S state) {
this.state = state;
Expand Down Expand Up @@ -72,23 +82,42 @@ public void submit(Action<S> action) {
}
} while (!q.isEmpty() && s.compareAndSet(0, 1));
if (head != null) {
InProgressTail inProgress = current.get();
InProgressTail<S> inProgress = (InProgressTail<S>) current.get();
if (inProgress == null) {
inProgress = new InProgressTail();
inProgress = new InProgressTail<>(this, tail);
current.set(inProgress);
inProgress.task = tail;
try {
// from now one cannot trust tail anymore
head.runNextTasks();
assert inProgress.others == null || inProgress.others.isEmpty();
} finally {
current.remove();
}
} else {
assert inProgress.task != null;
Task oldNextTail = inProgress.task.replaceNext(head);
assert oldNextTail == null;
inProgress.task = tail;

if (inProgress.combiner == this) {
Task oldNextTail = inProgress.task.replaceNext(head);
assert oldNextTail == null;
inProgress.task = tail;
} else {
Map<CombinerExecutor<S>, Task> map = inProgress.others;
if (map == null) {
map = inProgress.others = new HashMap<>(1);
}
Task task = map.get(this);
if (task == null) {
map.put(this, tail);
try {
// from now one cannot trust tail anymore
head.runNextTasks();
} finally {
map.remove(this);
}
} else {
Task oldNextTail = task.replaceNext(head);
assert oldNextTail == null;
map.put(this, tail);
}
}
}
}
}
Expand Down
79 changes: 74 additions & 5 deletions src/test/java/io/vertx/core/net/impl/pool/ConnectionPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.WorkerContext;
import io.vertx.test.core.VertxTestBase;
import org.junit.Ignore;
import org.junit.Test;

import java.util.*;
Expand All @@ -31,10 +30,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class ConnectionPoolTest extends VertxTestBase {

Expand Down Expand Up @@ -962,14 +959,14 @@ public void testPostTasksTrampoline() throws Exception {
List<Integer> res = Collections.synchronizedList(new LinkedList<>());
AtomicInteger seq = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(numAcquires);
int[] count = new int[1];
ConnectionPool<Connection> pool = ConnectionPool.pool(new PoolConnector<Connection>() {
int count = 0;
int reentrancy = 0;
@Override
public void connect(EventLoopContext context, Listener listener, Handler<AsyncResult<ConnectResult<Connection>>> handler) {
assertEquals(0, reentrancy++);
try {
int val = count++;
int val = count[0]++;
if (val == 0) {
// Queue extra requests
for (int i = 0;i < numAcquires;i++) {
Expand All @@ -979,6 +976,7 @@ public void connect(EventLoopContext context, Listener listener, Handler<AsyncRe
latch.countDown();
}));
}
assertEquals(1, count[0]);
}
handler.handle(Future.failedFuture("failure"));
} finally {
Expand All @@ -994,10 +992,81 @@ public boolean isValid(Connection connection) {
int num = seq.getAndIncrement();
pool.acquire(ctx, 0, onFailure(err -> res.add(num)));
awaitLatch(latch);
assertEquals(1 + numAcquires, count[0]);
List<Integer> expected = IntStream.range(0, numAcquires + 1).boxed().collect(Collectors.toList());
assertEquals(expected, res);
}

@Test
public void testConcurrentPostTasksTrampoline() throws Exception {
AtomicReference<ConnectionPool<Connection>> ref1 = new AtomicReference<>();
AtomicReference<ConnectionPool<Connection>> ref2 = new AtomicReference<>();
EventLoopContext ctx = vertx.createEventLoopContext();
List<Integer> res = Collections.synchronizedList(new LinkedList<>());
CountDownLatch latch = new CountDownLatch(4);
ConnectionPool<Connection> pool1 = ConnectionPool.pool(new PoolConnector<Connection>() {
int count = 0;
int reentrancy = 0;
@Override
public void connect(EventLoopContext context, Listener listener, Handler<AsyncResult<ConnectResult<Connection>>> handler) {
assertEquals(0, reentrancy++);
try {
int val = count++;
if (val == 0) {
ref1.get().acquire(ctx, 0, onFailure(err -> {
res.add(1);
latch.countDown();
}));
ref2.get().acquire(ctx, 0, onFailure(err -> {
res.add(2);
latch.countDown();
}));
}
handler.handle(Future.failedFuture("failure"));
} finally {
reentrancy--;
}
}
@Override
public boolean isValid(Connection connection) {
return true;
}
}, new int[]{1}, 2);
ConnectionPool<Connection> pool2 = ConnectionPool.pool(new PoolConnector<Connection>() {
int count = 0;
int reentrancy = 0;
@Override
public void connect(EventLoopContext context, Listener listener, Handler<AsyncResult<ConnectResult<Connection>>> handler) {
assertEquals(0, reentrancy++);
try {
int val = count++;
if (val == 0) {
ref2.get().acquire(ctx, 0, onFailure(err -> {
res.add(3);
latch.countDown();
}));
ref1.get().acquire(ctx, 0, onFailure(err -> {
res.add(4);
latch.countDown();
}));
}
handler.handle(Future.failedFuture("failure"));
} finally {
reentrancy--;
}
}
@Override
public boolean isValid(Connection connection) {
return true;
}
}, new int[]{1}, 2);
ref1.set(pool1);
ref2.set(pool2);
pool1.acquire(ctx, 0, onFailure(err -> res.add(0)));
awaitLatch(latch);
// assertEquals(Arrays.asList(0, 2, 1, 3, 4), res);
}

static class Connection {
public Connection() {
}
Expand Down
73 changes: 66 additions & 7 deletions src/test/java/io/vertx/core/net/impl/pool/SynchronizationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@
*/
package io.vertx.core.net.impl.pool;

import io.netty.util.concurrent.FastThreadLocal;
import io.vertx.test.core.AsyncTestBase;
import org.junit.Test;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -67,13 +71,55 @@ public void run() {
}

@Test
public void testFoo() throws Exception {
public void testActionReentrancy2() throws Exception {
List<Integer> log = new LinkedList<>();
Executor<Object> combiner1 = new CombinerExecutor<>(new Object());
Executor<Object> combiner2 = new CombinerExecutor<>(new Object());
int[] reentrancy = new int[2];
combiner1.submit(state1 -> taskOf(() -> {
assertEquals(0, reentrancy[0]++);
combiner1.submit(state2 -> taskOf(() -> {
assertEquals(0, reentrancy[0]++);
log.add(0);
reentrancy[0]--;
}));
combiner2.submit(state2 -> taskOf(() -> {
assertEquals(0, reentrancy[1]++);
log.add(1);
combiner1.submit(state3 -> taskOf(() -> {
assertEquals(0, reentrancy[0]++);
log.add(2);
reentrancy[0]--;
}));
combiner2.submit(state3 -> taskOf(() -> {
assertEquals(0, reentrancy[1]++);
log.add(3);
reentrancy[1]--;
}));
reentrancy[1]--;
}));
reentrancy[0]--;
}));
assertEquals(0, reentrancy[0]);
assertEquals(0, reentrancy[1]);
assertEquals(Arrays.asList(1, 3, 0, 2), log);
}

static Task taskOf(Runnable runnable) {
return new Task() {
@Override
public void run() {
runnable.run();
}
};
}

@Test
public void testFoo() throws Exception {
int numThreads = 8;
int numIter = 1_000 * 100;
Executor<Object> sync = new CombinerExecutor<>(new Object());
Executor.Action action = s -> {
Executor.Action<Object> action = s -> {
burnCPU(10);
return null;
};
Expand All @@ -96,11 +142,6 @@ public void testFoo() throws Exception {
}
}






public static class Utils {
public static long res = 0; // value sink
public static long ONE_MILLI_IN_NANO = 1000000;
Expand Down Expand Up @@ -170,4 +211,22 @@ public void run() {
});
assertEquals(3, order.get());
}

@Test
public void testFastThreadLocalStability() {
CombinerExecutor<Void> executor = new CombinerExecutor<>(null);
int expected = io.netty.util.internal.InternalThreadLocalMap.lastVariableIndex();
AtomicInteger counter = new AtomicInteger();
for (int i = 0;i < 1000;i++) {
executor = new CombinerExecutor<>(null);
executor.submit(state -> new Task() {
@Override
public void run() {
counter.incrementAndGet();
}
});
assertEquals(i + 1, counter.get());
}
assertEquals(expected, io.netty.util.internal.InternalThreadLocalMap.lastVariableIndex());
}
}

0 comments on commit dd6f643

Please sign in to comment.