Skip to content

Commit 1dfc5ce

Browse files
prateekmjagadish-v0
authored andcommitted
Misc. minor cleanup.
1. Added a meaningful name for the container thread pool threads. 2. Made the thread names for framework threads consistent. 3. Made a couple of monitoring/metrics threads daemon. 4. Fixed a few checkstyle warning about missing param/throws documentation. Author: Prateek Maheshwari <pmaheshw@linkedin.com> Reviewers: Jagadish <jagadish@apache.org>, Jacob M <jmakes@apache.org> Closes apache#433 from prateekm/container-thread-pool-name
1 parent bc0a47b commit 1dfc5ce

File tree

16 files changed

+101
-201
lines changed

16 files changed

+101
-201
lines changed

samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.samza.container.disk;
2020

21+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
22+
2123
import org.slf4j.Logger;
2224
import org.slf4j.LoggerFactory;
2325

@@ -34,9 +36,7 @@
3436
import java.util.concurrent.ConcurrentMap;
3537
import java.util.concurrent.Executors;
3638
import java.util.concurrent.ScheduledExecutorService;
37-
import java.util.concurrent.ThreadFactory;
3839
import java.util.concurrent.TimeUnit;
39-
import java.util.concurrent.atomic.AtomicInteger;
4040

4141
/**
4242
* An implementation of {@link DiskSpaceMonitor} that polls for disk usage based on a specified
@@ -47,7 +47,6 @@
4747
public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
4848
private enum State { INIT, RUNNING, STOPPED }
4949

50-
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
5150
private static final Logger log = LoggerFactory.getLogger(PollingScanDiskSpaceMonitor.class);
5251

5352
// Note: we use this as a set where the value is always Boolean.TRUE.
@@ -57,7 +56,11 @@ private enum State { INIT, RUNNING, STOPPED }
5756
private final Object lock = new Object();
5857

5958
private final ScheduledExecutorService schedulerService =
60-
Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
59+
Executors.newSingleThreadScheduledExecutor(
60+
new ThreadFactoryBuilder()
61+
.setNameFormat("Samza PollingScanDiskSpaceMonitor Thread-%d")
62+
.setDaemon(true)
63+
.build());
6164
private final Set<Path> watchPaths;
6265
private final long pollingIntervalMillis;
6366

@@ -197,13 +200,4 @@ private void updateSample() {
197200
}
198201
}
199202
}
200-
201-
private static class ThreadFactoryImpl implements ThreadFactory {
202-
private static final String PREFIX = "Samza-" + PollingScanDiskSpaceMonitor.class.getSimpleName() + "-";
203-
private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
204-
205-
public Thread newThread(Runnable runnable) {
206-
return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
207-
}
208-
}
209203
}

samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@
1818
*/
1919
package org.apache.samza.container.host;
2020

21+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
22+
2123
import org.slf4j.Logger;
2224
import org.slf4j.LoggerFactory;
2325

2426
import java.util.concurrent.ConcurrentHashMap;
2527
import java.util.concurrent.ConcurrentMap;
2628
import java.util.concurrent.Executors;
2729
import java.util.concurrent.ScheduledExecutorService;
28-
import java.util.concurrent.ThreadFactory;
2930
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.atomic.AtomicInteger;
3131

3232
/**
3333
* An implementation of {@link SystemStatisticsMonitor} for unix and mac platforms. Users can implement their own
@@ -42,8 +42,6 @@
4242
* This class is thread-safe.
4343
*/
4444
public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
45-
46-
private static final ThreadFactory THREAD_FACTORY = new StatisticsMonitorThreadFactory();
4745
private static final Logger LOG = LoggerFactory.getLogger(StatisticsMonitorImpl.class);
4846

4947
/**
@@ -60,7 +58,8 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
6058

6159
// Single threaded executor to handle callback invocations.
6260
private final ScheduledExecutorService schedulerService =
63-
Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
61+
Executors.newSingleThreadScheduledExecutor(
62+
new ThreadFactoryBuilder().setNameFormat("Samza StatisticsMonitor Thread-%d").setDaemon(true).build());
6463

6564
// Use this as a set with value always set to True
6665
private final ConcurrentMap<StatisticsMonitorImpl.Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
@@ -174,15 +173,4 @@ public boolean registerListener(Listener listener) {
174173
}
175174
}
176175
}
177-
178-
// A convenience class that provides named threads
179-
private static class StatisticsMonitorThreadFactory implements ThreadFactory {
180-
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
181-
private static final String PREFIX = "Samza-StatisticsMonitor-Thread-";
182-
183-
@Override
184-
public Thread newThread(Runnable runnable) {
185-
return new Thread(runnable, PREFIX + INSTANCE_COUNT.getAndIncrement());
186-
}
187-
}
188176
}

samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public CoordinatorStreamManager(Config coordinatorSystemConfig, MetricsRegistry
7979
/**
8080
* Register source with the coordinator stream.
8181
*
82-
* @param source
82+
* @param source source to register with the coordinator stream
8383
*/
8484
public void register(String source) {
8585
if (coordinatorStreamConsumer != null) {

samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import java.util
2626
import java.util.Base64
2727
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
2828

29+
import com.google.common.util.concurrent.ThreadFactoryBuilder
2930
import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
3031
import org.apache.samza.config.JobConfig.Config2Job
3132
import org.apache.samza.config.MetricsConfig.Config2Metrics
@@ -405,10 +406,14 @@ object SamzaContainer extends Logging {
405406
val threadPoolSize = config.getThreadPoolSize
406407
info("Got thread pool size: " + threadPoolSize)
407408

408-
val taskThreadPool = if (!singleThreadMode && threadPoolSize > 0)
409-
Executors.newFixedThreadPool(threadPoolSize)
410-
else
409+
410+
val taskThreadPool = if (!singleThreadMode && threadPoolSize > 0) {
411+
Executors.newFixedThreadPool(threadPoolSize,
412+
new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build())
413+
} else {
411414
null
415+
}
416+
412417

413418
val finalTaskFactory = TaskFactoryUtil.finalizeTaskFactory(
414419
taskFactory,

samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,18 @@
1919

2020
package org.apache.samza.metrics
2121

22-
import scala.collection._
23-
import scala.collection.JavaConverters._
24-
import java.lang.management.ManagementFactory
22+
import com.google.common.util.concurrent.ThreadFactoryBuilder
23+
import com.sun.management.OperatingSystemMXBean
24+
import com.sun.management.UnixOperatingSystemMXBean
25+
import org.apache.samza.util.Logging
26+
2527
import java.lang.Thread.State._
28+
import java.lang.management.ManagementFactory
2629
import java.util.concurrent.Executors
2730
import java.util.concurrent.TimeUnit
2831

29-
import com.sun.management.{OperatingSystemMXBean, UnixOperatingSystemMXBean}
30-
import org.apache.samza.util.Logging
31-
import org.apache.samza.util.DaemonThreadFactory
32-
33-
/**
34-
* Companion object for class JvmMetrics encapsulating various constants
35-
*/
36-
object JvmMetrics {
37-
val JVM_METRICS_THREAD_NAME_PREFIX = "JVM-METRICS"
38-
}
32+
import scala.collection.JavaConverters._
33+
import scala.collection._
3934

4035
/**
4136
* Straight up ripoff of Hadoop's metrics2 JvmMetrics class.
@@ -49,7 +44,8 @@ class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runna
4944
val threadMXBean = ManagementFactory.getThreadMXBean()
5045
val osMXBean = ManagementFactory.getOperatingSystemMXBean()
5146
var gcBeanCounters = Map[String, (Counter, Counter)]()
52-
val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(JvmMetrics.JVM_METRICS_THREAD_NAME_PREFIX))
47+
val executor = Executors.newSingleThreadScheduledExecutor(
48+
new ThreadFactoryBuilder().setNameFormat("Samza JvmMetrics Thread-%d").setDaemon(true).build())
5349

5450
// jvm metrics
5551
val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F)

samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,20 @@
1919

2020
package org.apache.samza.metrics.reporter
2121

22+
import com.google.common.util.concurrent.ThreadFactoryBuilder
23+
import org.apache.samza.metrics._
24+
import org.apache.samza.serializers.Serializer
25+
import org.apache.samza.system.OutgoingMessageEnvelope
26+
import org.apache.samza.system.SystemProducer
27+
import org.apache.samza.system.SystemStream
28+
import org.apache.samza.util.Logging
29+
2230
import java.util.HashMap
2331
import java.util.Map
24-
import scala.collection.JavaConverters._
25-
import org.apache.samza.util.Logging
26-
import org.apache.samza.metrics.Counter
27-
import org.apache.samza.metrics.Gauge
28-
import org.apache.samza.metrics.Timer
29-
import org.apache.samza.metrics.MetricsReporter
30-
import org.apache.samza.metrics.MetricsVisitor
31-
import org.apache.samza.metrics.ReadableMetricsRegistry
3232
import java.util.concurrent.Executors
33-
import org.apache.samza.util.DaemonThreadFactory
3433
import java.util.concurrent.TimeUnit
35-
import org.apache.samza.serializers.Serializer
36-
import org.apache.samza.system.SystemProducer
37-
import org.apache.samza.system.SystemStream
38-
import org.apache.samza.system.OutgoingMessageEnvelope
3934

40-
/**
41-
* Companion object for class MetricsSnapshotReporter encapsulating various constants
42-
*/
43-
object MetricsSnapshotReporter {
44-
val METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX = "METRIC-SNAPSHOT-REPORTER"
45-
}
35+
import scala.collection.JavaConverters._
4636

4737
/**
4838
* MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream.
@@ -66,7 +56,8 @@ class MetricsSnapshotReporter(
6656
serializer: Serializer[MetricsSnapshot] = null,
6757
clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
6858

69-
val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(MetricsSnapshotReporter.METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX))
59+
val executor = Executors.newSingleThreadScheduledExecutor(
60+
new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build())
7061
val resetTime = clock()
7162
var registries = List[(String, ReadableMetricsRegistry)]()
7263

samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,20 @@
1919

2020
package org.apache.samza.system.filereader
2121

22-
import org.apache.samza.util.BlockingEnvelopeMap
22+
import com.google.common.util.concurrent.ThreadFactoryBuilder
2323
import org.apache.samza.metrics.MetricsRegistry
24+
import org.apache.samza.system.IncomingMessageEnvelope
2425
import org.apache.samza.system.SystemStreamPartition
25-
import scala.collection.mutable.Map
26+
import org.apache.samza.util.BlockingEnvelopeMap
27+
import org.apache.samza.util.Logging
28+
2629
import java.io.RandomAccessFile
27-
import org.apache.samza.system.IncomingMessageEnvelope
28-
import java.util.concurrent.LinkedBlockingQueue
29-
import java.util.concurrent.Executors
3030
import java.util.concurrent.ExecutorService
31-
import org.apache.samza.util.DaemonThreadFactory
32-
import org.apache.samza.util.Logging
31+
import java.util.concurrent.Executors
32+
import java.util.concurrent.LinkedBlockingQueue
33+
34+
import scala.collection.mutable.Map
3335

34-
object FileReaderSystemConsumer {
35-
/**
36-
* prefix for the file reader system thread names
37-
*/
38-
val FILE_READER_SYSTEM_THREAD_PREFIX = "filereader-"
39-
}
4036

4137
class FileReaderSystemConsumer(
4238
systemName: String,
@@ -77,8 +73,9 @@ class FileReaderSystemConsumer(
7773
* start one thread for each file reader
7874
*/
7975
override def start {
80-
pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size, new DaemonThreadFactory(FileReaderSystemConsumer.FILE_READER_SYSTEM_THREAD_PREFIX))
81-
systemStreamPartitionAndStartingOffset.map { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) }
76+
pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size,
77+
new ThreadFactoryBuilder().setNameFormat("Samza FileReader Thread-%d").setDaemon(true).build())
78+
systemStreamPartitionAndStartingOffset.foreach { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) }
8279
}
8380

8481
/**

samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala

Lines changed: 0 additions & 39 deletions
This file was deleted.

samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala

Lines changed: 0 additions & 37 deletions
This file was deleted.

samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,11 @@ import kafka.consumer.ConsumerConfig
3131
import kafka.message.MessageSet
3232
import org.apache.samza.SamzaException
3333
import org.apache.samza.util.ExponentialSleepStrategy
34+
import org.apache.samza.util.KafkaUtil
3435
import org.apache.samza.util.Logging
35-
import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
3636

3737
import scala.collection.JavaConverters._
3838
import scala.collection.concurrent
39-
import org.apache.samza.util.KafkaUtil
40-
41-
/**
42-
* Companion object for class JvmMetrics encapsulating various constants
43-
*/
44-
object BrokerProxy {
45-
val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-"
46-
}
4739

4840
/**
4941
* A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
@@ -294,7 +286,7 @@ class BrokerProxy(
294286
if (!thread.isAlive) {
295287
info("Starting " + toString)
296288
thread.setDaemon(true)
297-
thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
289+
thread.setName("Samza BrokerProxy " + thread.getName)
298290
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
299291
override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e)
300292
})

0 commit comments

Comments
 (0)