Skip to content

Commit

Permalink
Add task name based tracing to scheduled tasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
timw authored and pkendall64 committed Oct 31, 2022
1 parent 8c8e67c commit d533fc8
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
15 changes: 7 additions & 8 deletions core/src/main/java/com/orientechnologies/orient/core/Orient.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ public Orient shutdown() {
}

public TimerTask scheduleTask(final Runnable task, final long delay, final long period) {
return scheduleTask(task.getClass().getSimpleName(), task, delay, period);
}

public TimerTask scheduleTask(
final String taskName, final Runnable task, final long delay, final long period) {
engineLock.readLock().lock();
try {
final TimerTask timerTask =
Expand All @@ -421,16 +426,10 @@ public void run() {
task.run();
} catch (Exception e) {
OLogManager.instance()
.error(
this,
"Error during execution of task " + task.getClass().getSimpleName(),
e);
.error(Orient.this, "Error during execution of task " + taskName, e);
} catch (Error e) {
OLogManager.instance()
.error(
this,
"Error during execution of task " + task.getClass().getSimpleName(),
e);
.error(Orient.this, "Error during execution of task " + taskName, e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.orientechnologies.common.profiler.OAbstractProfiler;
import com.orientechnologies.common.profiler.OProfiler;
import com.orientechnologies.common.thread.OThreadPoolExecutors;
import com.orientechnologies.common.thread.TracingExecutorService;
import com.orientechnologies.common.util.OCallable;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest;
Expand Down Expand Up @@ -96,7 +97,7 @@ public class ODistributedDatabaseImpl implements ODistributedDatabase {
private final AtomicLong operationsRunnig = new AtomicLong(0);
private final ODistributedSynchronizedSequence sequenceManager;
private final AtomicLong pending = new AtomicLong();
private final ThreadPoolExecutor requestExecutor;
private final TracingExecutorService requestExecutor;
private final OLockManager lockManager = new OLockManagerImpl();
private final Set<OTransactionId> inQueue = Collections.newSetFromMap(new ConcurrentHashMap<>());
private OFreezeGuard freezeGuard;
Expand Down Expand Up @@ -173,7 +174,7 @@ public void endOperation() {
this.localNodeName = manager.getLocalNodeName();

this.requestExecutor =
(ThreadPoolExecutor)OThreadPoolExecutors.newScalingThreadPool(
OThreadPoolExecutors.newScalingThreadPool(
String.format(
"OrientDB DistributedWorker node=%s db=%s", getLocalNodeName(), databaseName),
0,
Expand Down Expand Up @@ -258,7 +259,7 @@ public Object getValue() {
new OAbstractProfiler.OProfilerHookValue() {
@Override
public Object getValue() {
return (long) requestExecutor.getPoolSize();
return (long) ((ThreadPoolExecutor) requestExecutor).getPoolSize();
}
},
"distributed.db.*.workerThreads");
Expand Down Expand Up @@ -305,6 +306,7 @@ public void reEnqueue(
pending.incrementAndGet();
Orient.instance()
.scheduleTask(
String.format("DistributedDatabase[%s].reEnqueue", databaseName),
() -> {
try {
processRequest(
Expand Down Expand Up @@ -343,6 +345,9 @@ public void processRequest(
+ "' discarding");
}
}
final String taskName =
String.format(
"DistributedDatabase[%s].processRequest.%s", getDatabaseName(), task.getName());
synchronized (this) {
task.received(request, this);
manager.messageReceived(request);
Expand All @@ -364,7 +369,7 @@ public void processRequest(
}
};
try {
this.requestExecutor.submit(executeTask);
this.requestExecutor.submit(taskName, executeTask);
} catch (RejectedExecutionException e) {
task.finished(this);
this.lockManager.unlock(guards);
Expand All @@ -380,6 +385,7 @@ public void processRequest(
} else {
try {
this.requestExecutor.submit(
taskName,
() -> {
execute(request);
});
Expand Down Expand Up @@ -892,7 +898,7 @@ public long getReceivedRequests() {

@Override
public long getProcessedRequests() {
return requestExecutor.getCompletedTaskCount();
return ((ThreadPoolExecutor) requestExecutor).getCompletedTaskCount();
}

public void onDropShutdown() {
Expand Down Expand Up @@ -1310,7 +1316,7 @@ public String dump() {
"\n\nDATABASE '" + databaseName + "' ON SERVER '" + manager.getLocalNodeName() + "'");

buffer.append("\n- MESSAGES IN QUEUES");
buffer.append(" (" + (requestExecutor.getPoolSize()) + " WORKERS):");
buffer.append(" (" + (((ThreadPoolExecutor) requestExecutor).getPoolSize()) + " WORKERS):");

return buffer.toString();
}
Expand Down

0 comments on commit d533fc8

Please sign in to comment.