Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve selector thread name #3028

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public boolean isStopped() {
* need to be read.
*/
protected class SelectAcceptThread extends AbstractSelectThread {
private static final String SELECT_THREAD_NAME_FORMAT = "thrift-select-thread";

// The server transport on which new client transports will be accepted
private final TNonblockingServerTransport serverTransport;
Expand All @@ -122,6 +123,7 @@ public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
throws IOException {
this.serverTransport = serverTransport;
serverTransport.registerSelector(selector);
setName(SELECT_THREAD_NAME_FORMAT);
}

public boolean isStopped() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public TThreadedSelectorServer(Args args) {
protected boolean startThreads() {
try {
for (int i = 0; i < args.selectorThreads; ++i) {
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread, i));
}
acceptThread =
new AcceptThread(
Expand Down Expand Up @@ -333,7 +333,7 @@ private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int
* hand off to the IO selector threads
*/
protected class AcceptThread extends Thread {

private static final String SELECT_THREAD_NAME_FORMAT = "thrift-select-thread";
// The listen socket to accept on
private final TNonblockingServerTransport serverTransport;
private final Selector acceptSelector;
Expand All @@ -352,6 +352,7 @@ public AcceptThread(
this.threadChooser = threadChooser;
this.acceptSelector = SelectorProvider.provider().openSelector();
this.serverTransport.registerSelector(acceptSelector);
setName(SELECT_THREAD_NAME_FORMAT);
}

/**
Expand Down Expand Up @@ -468,6 +469,7 @@ protected class SelectorThread extends AbstractSelectThread {
private final BlockingQueue<TNonblockingTransport> acceptedQueue;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
private static final long MONITOR_PERIOD = 1000L;
private static final String SELECT_THREAD_NAME_FORMAT = "thrift-select-rw-thread-%d";
private int jvmBug = 0;

/**
Expand All @@ -488,6 +490,19 @@ public SelectorThread(int maxPendingAccepts) throws IOException {
this(createDefaultAcceptQueue(maxPendingAccepts));
}

/**
* Set up the SelectorThread with an bounded queue for incoming accepts.
*
* @param maxPendingAccepts The max number of pending accepts.
* @param id the thread id.
* @throws IOException
*/
public SelectorThread(int maxPendingAccepts, int id) throws IOException {
this(
createDefaultAcceptQueue(maxPendingAccepts),
String.format(SELECT_THREAD_NAME_FORMAT, id));
}

/**
* Set up the SelectorThread with a specified queue for connections.
*
Expand All @@ -499,6 +514,19 @@ public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws
this.acceptedQueue = acceptedQueue;
}

/**
* Set up the SelectorThread with a specified queue for connections.
*
* @param acceptedQueue The BlockingQueue implementation for holding incoming accepted
* @param name the thread name.
* @throws IOException if a selector cannot be created.
*/
public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue, String name)
throws IOException {
this.acceptedQueue = acceptedQueue;
setName(name);
}

/**
* Hands off an accepted connection to be handled by this thread. This method will block if the
* queue for new connections is at capacity.
Expand Down
Loading