-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler Integration with Mantis Master #183
Scheduler Integration with Mantis Master #183
Conversation
@@ -537,7 +538,15 @@ public void onWorkerEvent(WorkerEvent workerEvent) { | |||
if(!JobHelper.isTerminalWorkerEvent(workerEvent)) { | |||
logger.warn("Event from Worker {} for a cluster {} that no longer exists. Terminate worker", workerEvent, workerEvent.getWorkerId().getJobCluster()); | |||
Optional<String> host = JobHelper.getWorkerHostFromWorkerEvent(workerEvent); | |||
mantisScheduler.unscheduleAndTerminateWorker(workerEvent.getWorkerId(), host); | |||
Optional<JobDefinition> archivedJobDefinition = | |||
jobClusterInfoManager.getArchivedJobDefinition(workerEvent.getWorkerId().getJobId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Andyz26 can you check if this is valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@@ -143,6 +155,20 @@ public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber audit | |||
final MantisJobStore mantisJobStore = new MantisJobStore(storageProvider); | |||
final ActorRef jobClusterManagerActor = system.actorOf(JobClustersManagerActor.props(mantisJobStore, lifecycleEventPublisher), "JobClustersManager"); | |||
|
|||
// Beginning of new stuff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: improve the comment?
Optional<IMantisJobMetadata> completedJobOptional = | ||
jobManager.getJobDataForCompletedJob(r.getWorkerId().getJobId()); | ||
if (completedJobOptional.isPresent()) { | ||
JobDefinition jobDefinition = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add log here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also shall we consider saving this state here into 'jobManager' in case some in-transit node keeps triggering this to create a new scheduler and attempt to unscheduled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I wanted to take a stab at this. But, unfortunately, that class is super dense - so I thought I'll clean it up later.
import java.util.Properties; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Consumer; | ||
import org.apache.flink.configuration.Configuration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this going to be a short-term plan? I feel like this could cause some pain in the future upgrade if things changed on the flink side (I don't feel like this is something they will consider/maintain back-compat carefully though).
Context
This integrates the new federated scheduler factory that can schedule jobs on both mesos and the new task executors with the mantis master.
Checklist
./gradlew build
compiles code correctly./gradlew test
passes all testsCONTRIBUTING.md