-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Part 15]: Resource Cluster Abstraction and an implementation based on Akka actor #166
[Part 15]: Resource Cluster Abstraction and an implementation based on Akka actor #166
Conversation
@@ -134,4 +136,8 @@ void storeAndUpdateWorkers(final IMantisWorkerMetadata existingWorker, final IMa | |||
List<String> initActiveVmAttributeValuesList() throws IOException; | |||
|
|||
void setActiveVmAttributeValuesList(final List<String> vmAttributesList) throws IOException; | |||
|
|||
TaskExecutorRegistration getTaskExecutorFor(TaskExecutorID taskExecutorID) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's sync offline to see how to resolve the internal cass problem for these new data required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 thanks!
if (taskExecutorRegistration.isPresent()) { | ||
sender().tell(taskExecutorRegistration.get(), self()); | ||
} else { | ||
sender().tell(new Status.Failure(new Exception("")), self()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks for spotting this.
...l-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java
Outdated
Show resolved
Hide resolved
.thenApply(Ack.class::cast) | ||
.toCompletableFuture() | ||
.whenComplete((ack, dontCare) -> | ||
mapper.onTaskExecutorDiscovered(clusterID, taskExecutorID)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if there is an error from the future result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't matter. I still want the association to be made.
@@ -521,4 +523,13 @@ public List<String> initActiveVmAttributeValuesList() throws IOException { | |||
} | |||
} | |||
|
|||
@Override | |||
public TaskExecutorRegistration getTaskExecutorFor(TaskExecutorID taskExecutorID) throws IOException { | |||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This SimpleCachedFileStorageProvider
doesn't appear to do that the name implies. It looks a lot like the NoOp one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out. Honestly, I was not very impressed by that implementation and thought I could fix it later for OSS. But, I think it might be a good idea to keep this as-is for the time being and revisit it at a later point.
*/ | ||
@ToString(of = {"clusterID"}) | ||
@Slf4j | ||
class ResourceClusterActor extends AbstractActor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This represents the main actor that needs to be reviewed fully for this change.
...trol-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java
Outdated
Show resolved
Hide resolved
private final ClusterID clusterID; | ||
private final MantisJobStore mantisJobStore; | ||
|
||
public static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, Clock clock, RpcService rpcService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: package-private since the class is not exposed.
...trol-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java
Show resolved
Hide resolved
TaskExecutorRegistration registration = | ||
mantisJobStore.getTaskExecutor(request.getTaskExecutorID()); | ||
setupTaskExecutorStateIfNecessary(request.getTaskExecutorID()); | ||
self().tell(registration, self()); | ||
self().tell( | ||
new TaskExecutorStatusChange( | ||
registration.getTaskExecutorID(), | ||
registration.getClusterID(), | ||
TaskExecutorReport.occupied(request.getWorkerId())), | ||
self()); | ||
sender.tell(Ack.getInstance(), self()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write a test for this workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
...trol-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java
Show resolved
Hide resolved
matchedExecutor.get().getValue().onAssignment(request.getWorkerId()); | ||
sender().tell(matchedExecutor.get().getKey(), self()); | ||
} else { | ||
sender().tell(new Status.Failure(new NoResourceAvailableException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a TODO here that we should expand the cluster?
if (state.isRegistered() && state.getGateway().isDone()) { | ||
sender().tell(state.getGateway().join(), self()); | ||
} else { | ||
sender().tell(new Status.Failure(new Exception("")), self()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more descriptive exception please.
Context
Resource Cluster is a new abstraction to capture all the task executors that are registered with the mantis master.
Checklist
./gradlew build
compiles code correctly./gradlew test
passes all testsCONTRIBUTING.md