diff --git a/CHANGES.md b/CHANGES.md index 92b578d16c85..e331f27b249a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Name all Java threads to aid in debugging ([#23049](https://github.com/apache/beam/issues/23049)). ## Breaking Changes diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java index 2b1d3997ebaf..b0067589ea06 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Server; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,7 +202,12 @@ public static void start( } worker.startStatusPages(); worker.start(); - ExecutorService executor = Executors.newSingleThreadExecutor(); + ExecutorService executor = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ControlClient-thread") + .build()); executor.execute( () -> { // Task to get new client connections while (true) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java index 921418b633b9..badd629425cb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -96,7 +97,12 @@ public FetchAndFilterStreamingSideInputsOperation( (StreamingModeExecutionContext.StreamingModeStepContext) stepContext.namespacedToUser()); this.elementsToProcess = new LinkedBlockingQueue<>(); - this.singleThreadExecutor = Executors.newSingleThreadExecutor(); + this.singleThreadExecutor = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("FetchAndFilterStreamingSideInput-thread") + .build()); } /** A {@link PCollectionView} which forwards all calls to its delegate. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java index 1e6539c2ba69..116e7a235d46 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Exposes methods to interop with JFR. This is only supported on java 9 and up, java 8 does not @@ -35,7 +36,9 @@ */ class JfrInterop { // ensure only a single JFR profile is running at once - private static final ExecutorService JFR_EXECUTOR = Executors.newSingleThreadExecutor(); + private static final ExecutorService JFR_EXECUTOR = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("JFRprofile-thread").build()); private final Constructor recordingCtor; private final Method recordingStart; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index b35783946ecc..6a5e608f5b8f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -103,6 +103,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Verify; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -626,7 +627,12 @@ private abstract class AbstractWindmillStream implements Wi private final StreamObserverFactory streamObserverFactory = StreamObserverFactory.direct(streamDeadlineSeconds * 2); private final Function, StreamObserver> clientFactory; - private final Executor executor = Executors.newSingleThreadExecutor(); + private final Executor executor = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("WindmillStream-thread") + .build()); // The following should be protected by synchronizing on this, except for // the atomics which may be read atomically for status pages. diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java index f29a1e956c90..a86dc0650712 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java @@ -46,7 +46,7 @@ public class BundleCheckpointHandlers { /** * A {@link BundleCheckpointHandler} which uses {@link - * org.apache.beam.runners.core.TimerInternals.TimerData} ans {@link + * org.apache.beam.runners.core.TimerInternals.TimerData} and {@link * org.apache.beam.sdk.state.ValueState} to reschedule {@link DelayedBundleApplication}. */ public static class StateAndTimerBundleCheckpointHandler implements BundleCheckpointHandler { diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java index f17695c12310..35ac37bcff34 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java @@ -158,7 +158,12 @@ private ScheduledExecutorService getExecutor() { synchronized (this) { if (executor == null) { executor = - Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build()); + Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setNameFormat("ScheduledExecutor-thread") + .setDaemon(true) + .build()); } return executor; } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java index 27a2e33ed3e1..1cb62136edcf 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.fn.server.ServerFactory; import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,7 +95,12 @@ private EmbeddedEnvironmentFactory( @SuppressWarnings("FutureReturnValueIgnored") // no need to monitor shutdown thread public RemoteEnvironment createEnvironment(Environment environment, String workerId) throws Exception { - ExecutorService executor = Executors.newSingleThreadExecutor(); + ExecutorService executor = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("CreateEnvironment-thread") + .build()); Future fnHarness = executor.submit( () -> { diff --git a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index 84c1bb34bebc..98b068a9cce7 100644 --- a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.spark.SparkEnv$; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; @@ -141,7 +142,9 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) { final AbstractTranslationContext translationContext = translatePipeline(pipeline); - final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ExecutorService executorService = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalSpark-thread").build()); final Future submissionFuture = executorService.submit( () -> { diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index 68f54ac93bf0..ee0d2b1deb91 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.spark.SparkEnv$; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; @@ -147,7 +148,9 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) { final TranslationContext translationContext = translatePipeline(pipeline); - final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ExecutorService executorService = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalSpark-thread").build()); final Future submissionFuture = executorService.submit( () -> { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java index 0f31f619cfce..4a7ffd13ee60 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingListener; @@ -119,7 +120,12 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) final SparkTranslationContext context = translator.createTranslationContext(jsc, pipelineOptions, jobInfo); - final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ExecutorService executorService = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("DefaultSparkRunner-thread") + .build()); LOG.info("Running job {} on Spark master {}", jobInfo.jobId(), jsc.master()); diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java index 8fc9c95b5b2d..0107f82866bc 100644 --- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java @@ -120,7 +120,11 @@ public static class SdkHarness { public SdkHarness() { try { // Setup execution-time servers - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build(); + ThreadFactory threadFactory = + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ProcessBundlesBenchmark-thread") + .build(); serverExecutor = Executors.newCachedThreadPool(threadFactory); ServerFactory serverFactory = ServerFactory.createDefault(); dataServer = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index be418e7cea94..9624f3ddb2a1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -135,6 +135,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListenableFuture; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.slf4j.Logger; @@ -1681,7 +1682,12 @@ private static class BoundedExecutorService { BoundedExecutorService(ListeningExecutorService taskExecutor, int parallelism) { this.taskExecutor = taskExecutor; this.taskSubmitExecutor = - MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + MoreExecutors.listeningDecorator( + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("BoundedBigQueryService-thread") + .build())); this.semaphore = new Semaphore(parallelism); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index f70d30a55d63..3378523b3930 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -59,6 +59,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.checkerframework.checker.nullness.qual.Nullable; /** A helper class for talking to Pubsub via grpc. */ @@ -166,7 +167,13 @@ public void close() { private Channel newChannel() throws IOException { checkState(publisherChannel != null, "PubsubGrpcClient has been closed"); ClientAuthInterceptor interceptor = - new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor()); + new ClientAuthInterceptor( + credentials, + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("PubsubGrpcClient-thread") + .build())); return ClientInterceptors.intercept(publisherChannel, interceptor); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index aca59813a77d..a38b92e17882 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -54,6 +54,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -357,7 +358,12 @@ public long getSplitBacklogBytes() { // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout // like 100 milliseconds does not work well. This along with large receive buffer for // consumer achieved best throughput in tests (see `defaultConsumerProperties`). - private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor(); + private final ExecutorService consumerPollThread = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("KafkaConsumerPoll-thread") + .build()); private AtomicReference consumerPollException = new AtomicReference<>(); private final SynchronousQueue> availableRecordsQueue = new SynchronousQueue<>();