-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Part 7]: Introducing the Task Executor #157
Conversation
So far, we have seen several classes on the worker side that are meant to help with the on-demand task execution. This diff introduces the task executor class that binds all these classes to implement the gateway.
.newSingleThreadExecutor( | ||
new ThreadFactoryBuilder().setNameFormat(nameFormat).build())); | ||
|
||
final List<Subscription> subscriptions = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do we use these subscriptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nowhere right now. But, I think the intention when I wrote the code (a long time ago) was that we would close these subscriptions when the task executor is about to be shut down. So, it just makes for a clean, graceful shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love the testing this enables visible in TaskExecutorTest.java
. Great work. Approving with the condition of getting the CI check fixed.
return performAction("disconnectTaskExecutor", taskExecutorDisconnection); | ||
} | ||
|
||
private CompletableFuture<Ack> performAction(String action, Object body) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: An enum or similar closed form solution might be safer than a free form string for the action
parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this is strictly within the class - so I'm not sure what value enum would particularly add. If this was exposed, I agree enums are definitely better as they are stronger types than free-form strings.
} | ||
|
||
private void startTaskExecutorServices() throws Exception { | ||
validateRunsInMainThread(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to run in main thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want all state changes on the main thread similar to an actor. Any IO actions can run on the IO executor.
...ol-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServices.java
Show resolved
Hide resolved
mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/TaskExecutor.java
Outdated
Show resolved
Hide resolved
} catch (Exception inner) { | ||
log.error("Disconnection has also failed", inner); | ||
} | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the handling policy here to crash and restart the whole process on each request failure? Shall we consider some retries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the process does not restart. When the resource manager gateway connection fails to establish, the service goes from the "RUNNING" to the "FAILED" state. The task executor at this point (which is listening to these state transitions) would try to reestablish a new connection with the resource manager. So, the process doesn't restart.
} | ||
|
||
@Override | ||
public void runOneIteration() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name doesn't seem to match the work inside (sending heartbeat report).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a method of AbstractScheduledService. It's just a nice framework to write these services.
* limitations under the License. | ||
*/ | ||
|
||
package io.mantisrx.server.worker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this seems generic enough for both worker/server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we don't have a usage for this in the server right now. Will move it at a later point when there's a concrete usage.
import io.mantisrx.runtime.MachineDefinition; | ||
|
||
public class MachineDefinitionUtils { | ||
public static MachineDefinition sys(WorkerPorts workerPorts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"WithWorkerPorts"?
|
||
public TaskExecutor(RpcService rpcService, | ||
WorkerConfiguration workerConfiguration, | ||
HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inconsistent format on line breaks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Cleaned it up.
|
||
public TaskExecutor(RpcService rpcService, | ||
WorkerConfiguration workerConfiguration, | ||
HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inconsistent format on line breaks
.register(new ResourceManagerChangeListener()); | ||
} | ||
|
||
public CompletableFuture<Void> awaitRunning() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used? Cannot find any ref to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in the tests. In the tests, it's important to know when the task executor has started so that we can start issuing commands to it.
validateRunsInMainThread(); | ||
|
||
masterMonitor = highAvailabilityServices.getMasterClientApi(); | ||
RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we doing this?
mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/TaskExecutor.java
Show resolved
Hide resolved
mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/TaskExecutor.java
Show resolved
Hide resolved
private ResourceManagerGatewayCxn currentResourceManagerCxn; | ||
private TaskExecutorReport currentReport; | ||
private Task currentTask; | ||
private Subscription currentTaskStatusSubscription; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this being tracked?
Context
So far, we have seen several classes on the worker side that are meant to help with the on-demand task execution. This diff introduces the task executor class that binds all these classes to implement the gateway.
Checklist
./gradlew build
compiles code correctly./gradlew test
passes all testsCONTRIBUTING.md