Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fix ExecutorCompletionService workaround in ChunkProcessingPipeline #5128

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2023 The Terasology Foundation
// SPDX-License-Identifier: Apache-2.0

package org.terasology.engine.world.chunks.pipeline;

import org.joml.Vector3i;
import org.joml.Vector3ic;
import org.terasology.engine.world.chunks.Chunk;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* A specialised alternative to {@link java.util.concurrent.ExecutorCompletionService},
* used for submitting chunk tasks and queuing their results.
*
* Whilst this class adheres to the {@link CompletionService} interface, use of the class's
* {@link #submit(Callable, Vector3ic)} overload is preferred over those inherited from the interface.
*/
public class ChunkExecutorCompletionService implements CompletionService<Chunk> {
private static final Vector3ic EMPTY_VECTOR3I = new Vector3i();
private final ThreadPoolExecutor threadPoolExecutor;
private final BlockingQueue<Future<Chunk>> completionQueue;

private final class ChunkFutureWithCompletion extends PositionFuture<Chunk> {
ChunkFutureWithCompletion(Callable callable, Vector3ic position) {
super(callable, position);
}

ChunkFutureWithCompletion(Runnable runnable, Chunk result, Vector3ic position) {
super(runnable, result, position);
}

@Override
protected void done() {
super.done();
try {
completionQueue.put(this);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

public ChunkExecutorCompletionService(ThreadPoolExecutor threadPoolExecutor, BlockingQueue<Future<Chunk>> completionQueue) {
this.threadPoolExecutor = threadPoolExecutor;
this.completionQueue = completionQueue;
}

/**
* Submits a task to be executed.
* @param callable the task to submit
*
* @deprecated Use {@link #submit(Callable, Vector3ic)} instead
*/
@Override
@Deprecated
public Future<Chunk> submit(Callable<Chunk> callable) {
RunnableFuture<Chunk> task = new ChunkFutureWithCompletion(callable, EMPTY_VECTOR3I);
threadPoolExecutor.execute(task);
return task;
}

/**
* Submits a chunk task to be executed.
* @param callable the chunk task to execute.
* @param position the position of the chunk.
* @return the submitted task.
*/
public Future<Chunk> submit(Callable<Chunk> callable, Vector3ic position) {
RunnableFuture<Chunk> task = new ChunkFutureWithCompletion(callable, position);
threadPoolExecutor.execute(task);
return task;
}

/**
* Submits a task to be executed.
* @param runnable the task to run.
* @param value the value to return upon task completion.
*
* @deprecated Use {@link #submit(Callable, Vector3ic)} instead
*/
@Override
@Deprecated
public Future<Chunk> submit(Runnable runnable, Chunk value) {
RunnableFuture<Chunk> task = new ChunkFutureWithCompletion(runnable, value, EMPTY_VECTOR3I);
threadPoolExecutor.execute(task);
return task;
}

/**
* Retrieves a completed task from the queue.
* @return a completed task.
* @throws InterruptedException if interrupted whilst waiting on the queue.
*/
@Override
public Future<Chunk> take() throws InterruptedException {
return completionQueue.take();
}

/**
* Retrieves a completed task from the queue if not empty.
* @return a completed task, or null if there are no tasks in the queue.
*/
@Override
public Future<Chunk> poll() {
return completionQueue.poll();
}

/**
* Retrieves a completed task from the queue if not empty.
* @param l the timeout duration before returning null.
* @param timeUnit the time units of the timeout duration.
*
* @return a completed task, or null if there are no tasks in the queue.
*/
@Override
public Future<Chunk> poll(long l, TimeUnit timeUnit) throws InterruptedException {
return completionQueue.poll(l, timeUnit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,17 @@
import org.slf4j.LoggerFactory;
import org.terasology.engine.monitoring.ThreadActivity;
import org.terasology.engine.monitoring.ThreadMonitor;
import org.terasology.engine.utilities.ReflectionUtil;
import org.terasology.engine.world.chunks.Chunk;
import org.terasology.engine.world.chunks.pipeline.stages.ChunkTask;
import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -50,7 +45,7 @@ public class ChunkProcessingPipeline {

private final List<ChunkTaskProvider> stages = Lists.newArrayList();
private final Thread reactor;
private final CompletionService<Chunk> chunkProcessor;
private final ChunkExecutorCompletionService chunkProcessor;
private final ThreadPoolExecutor executor;
private final Function<Vector3ic, Chunk> chunkProvider;
private final Map<Vector3ic, ChunkProcessingInfo> chunkProcessingInfoMap = Maps.newConcurrentMap();
Expand All @@ -66,36 +61,18 @@ public ChunkProcessingPipeline(Function<Vector3ic, Chunk> chunkProvider, Compara
NUM_TASK_THREADS,
NUM_TASK_THREADS, 0L,
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue(800, unwrappingComporator(comparable)),
new PriorityBlockingQueue(800, comparable),
this::threadFactory,
this::rejectQueueHandler) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PositionFuture<>(newTaskFor, ((PositionalCallable) callable).getPosition());
}
};
this::rejectQueueHandler);
logger.debug("allocated {} threads", NUM_TASK_THREADS);
chunkProcessor = new ExecutorCompletionService<>(executor,
chunkProcessor = new ChunkExecutorCompletionService(executor,
new PriorityBlockingQueue<>(800, comparable));
reactor = new Thread(this::chunkTaskHandler);
reactor.setDaemon(true);
reactor.setName("Chunk-Processing-Reactor");
reactor.start();
}

/**
* BlackMagic method: {@link ExecutorCompletionService} wraps task with QueueingFuture (private access)
* there takes wrapped task for comparing in {@link ThreadPoolExecutor}
*/
private Comparator unwrappingComporator(Comparator<Future<Chunk>> comparable) {
return (o1, o2) -> {
Object unwrapped1 = ReflectionUtil.readField(o1, "task");
Object unwrapped2 = ReflectionUtil.readField(o2, "task");
return comparable.compare((Future<Chunk>) unwrapped1, (Future<Chunk>) unwrapped2);
};
}

/**
* Reactor thread. Handles all ChunkTask dependency logic and running.
*/
Expand Down Expand Up @@ -190,11 +167,11 @@ private Chunk getChunkBy(ChunkTaskProvider requiredStage, Vector3ic position) {
}

private Future<Chunk> runTask(ChunkTask task, List<Chunk> chunks) {
return chunkProcessor.submit(new PositionalCallable(() -> {
return chunkProcessor.submit(() -> {
try (ThreadActivity ignored = ThreadMonitor.startThreadActivity(task.getName())) {
return task.apply(chunks);
}
}, task.getPosition()));
}, task.getPosition());
}

private Thread threadFactory(Runnable runnable) {
Expand Down Expand Up @@ -236,8 +213,7 @@ public ListenableFuture<Chunk> invokeGeneratorTask(Vector3ic position, Supplier<
SettableFuture<Chunk> exitFuture = SettableFuture.create();
chunkProcessingInfo = new ChunkProcessingInfo(position, exitFuture);
chunkProcessingInfoMap.put(position, chunkProcessingInfo);
chunkProcessingInfo.setCurrentFuture(chunkProcessor.submit(new PositionalCallable(generatorTask::get,
position)));
chunkProcessingInfo.setCurrentFuture(chunkProcessor.submit(generatorTask::get, position));
return exitFuture;
}
}
Expand Down Expand Up @@ -317,26 +293,4 @@ public boolean isPositionProcessing(Vector3ic pos) {
public Iterable<Vector3ic> getProcessingPosition() {
return chunkProcessingInfoMap.keySet();
}

/**
* Dummy callable for passthru position for {@link java.util.concurrent.ThreadPoolExecutor}#newTaskFor
*/
private static final class PositionalCallable implements Callable<Chunk> {
private final Callable<Chunk> callable;
private final Vector3ic position;

private PositionalCallable(Callable<Chunk> callable, Vector3ic position) {
this.callable = callable;
this.position = position;
}

public Vector3ic getPosition() {
return position;
}

@Override
public Chunk call() throws Exception {
return callable.call();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,23 @@

import org.joml.Vector3ic;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class PositionFuture<T> implements RunnableFuture<T> {

private final RunnableFuture<T> delegate;
public class PositionFuture<T> extends FutureTask<T> {
private final Vector3ic position;

public PositionFuture(RunnableFuture<T> delegate, Vector3ic position) {
this.delegate = delegate;
public PositionFuture(Callable callable, Vector3ic position) {
super(callable);
this.position = position;
}

public Vector3ic getPosition() {
return position;
}

@Override
public void run() {
delegate.run();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get();
public PositionFuture(Runnable runnable, T result, Vector3ic position) {
super(runnable, result);
this.position = position;
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return delegate.get(timeout, unit);
public Vector3ic getPosition() {
return position;
}
}