Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/name all java threads #1

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Name all Java threads to aid in debugging ([#23049](https://github.com/apache/beam/issues/23049)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Server;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -201,7 +202,12 @@ public static void start(
}
worker.startStatusPages();
worker.start();
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ControlClient-thread")
.build());
executor.execute(
() -> { // Task to get new client connections
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -96,7 +97,12 @@ public FetchAndFilterStreamingSideInputsOperation(
(StreamingModeExecutionContext.StreamingModeStepContext)
stepContext.namespacedToUser());
this.elementsToProcess = new LinkedBlockingQueue<>();
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
this.singleThreadExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("FetchAndFilterStreamingSideInput-thread")
.build());
}

/** A {@link PCollectionView} which forwards all calls to its delegate. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Exposes methods to interop with JFR. This is only supported on java 9 and up, java 8 does not
Expand All @@ -35,7 +36,9 @@
*/
class JfrInterop {
// ensure only a single JFR profile is running at once
private static final ExecutorService JFR_EXECUTOR = Executors.newSingleThreadExecutor();
private static final ExecutorService JFR_EXECUTOR =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("JFRprofile-thread").build());

private final Constructor<?> recordingCtor;
private final Method recordingStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Verify;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -626,7 +627,12 @@ private abstract class AbstractWindmillStream<RequestT, ResponseT> implements Wi
private final StreamObserverFactory streamObserverFactory =
StreamObserverFactory.direct(streamDeadlineSeconds * 2);
private final Function<StreamObserver<ResponseT>, StreamObserver<RequestT>> clientFactory;
private final Executor executor = Executors.newSingleThreadExecutor();
private final Executor executor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("WindmillStream-thread")
.build());

// The following should be protected by synchronizing on this, except for
// the atomics which may be read atomically for status pages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BundleCheckpointHandlers {

/**
* A {@link BundleCheckpointHandler} which uses {@link
* org.apache.beam.runners.core.TimerInternals.TimerData} ans {@link
* org.apache.beam.runners.core.TimerInternals.TimerData} and {@link
* org.apache.beam.sdk.state.ValueState} to reschedule {@link DelayedBundleApplication}.
*/
public static class StateAndTimerBundleCheckpointHandler<T> implements BundleCheckpointHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ private ScheduledExecutorService getExecutor() {
synchronized (this) {
if (executor == null) {
executor =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setNameFormat("ScheduledExecutor-thread")
.setDaemon(true)
.build());
}
return executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -94,7 +95,12 @@ private EmbeddedEnvironmentFactory(
@SuppressWarnings("FutureReturnValueIgnored") // no need to monitor shutdown thread
public RemoteEnvironment createEnvironment(Environment environment, String workerId)
throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("CreateEnvironment-thread")
.build());
Future<?> fnHarness =
executor.submit(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
Expand Down Expand Up @@ -141,7 +142,9 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {

final AbstractTranslationContext translationContext = translatePipeline(pipeline);

final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ExecutorService executorService =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalSpark-thread").build());
final Future<?> submissionFuture =
executorService.submit(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
Expand Down Expand Up @@ -147,7 +148,9 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {

final TranslationContext translationContext = translatePipeline(pipeline);

final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ExecutorService executorService =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalSpark-thread").build());
final Future<?> submissionFuture =
executorService.submit(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
Expand Down Expand Up @@ -119,7 +120,12 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)

final SparkTranslationContext context =
translator.createTranslationContext(jsc, pipelineOptions, jobInfo);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ExecutorService executorService =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("DefaultSparkRunner-thread")
.build());

LOG.info("Running job {} on Spark master {}", jobInfo.jobId(), jsc.master());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ public static class SdkHarness {
public SdkHarness() {
try {
// Setup execution-time servers
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ProcessBundlesBenchmark-thread")
.build();
serverExecutor = Executors.newCachedThreadPool(threadFactory);
ServerFactory serverFactory = ServerFactory.createDefault();
dataServer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1681,7 +1682,12 @@ private static class BoundedExecutorService {
BoundedExecutorService(ListeningExecutorService taskExecutor, int parallelism) {
this.taskExecutor = taskExecutor;
this.taskSubmitExecutor =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
MoreExecutors.listeningDecorator(
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("BoundedBigQueryService-thread")
.build()));
this.semaphore = new Semaphore(parallelism);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A helper class for talking to Pubsub via grpc. */
Expand Down Expand Up @@ -166,7 +167,13 @@ public void close() {
private Channel newChannel() throws IOException {
checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
ClientAuthInterceptor interceptor =
new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
new ClientAuthInterceptor(
credentials,
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("PubsubGrpcClient-thread")
.build()));
return ClientInterceptors.intercept(publisherChannel, interceptor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -357,7 +358,12 @@ public long getSplitBacklogBytes() {
// network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
// like 100 milliseconds does not work well. This along with large receive buffer for
// consumer achieved best throughput in tests (see `defaultConsumerProperties`).
private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
private final ExecutorService consumerPollThread =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("KafkaConsumerPoll-thread")
.build());
private AtomicReference<Exception> consumerPollException = new AtomicReference<>();
private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue =
new SynchronousQueue<>();
Expand Down