Skip to content

Commit

Permalink
Setting hostname as part of heartbeat (#180)
Browse files Browse the repository at this point in the history
Every heartbeat sent from the new task executor will contain the hostname from which it is sent. Note that this field already exists but wasn't used by the Mesos-based task executor. The new task executor will use it.

Co-authored-by: Sundaram Ananthanarayanan <sananthanarayanan@netflix.com>
  • Loading branch information
sundargates and sundargates authored Apr 16, 2022
1 parent f6938d2 commit 925b330
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -39,11 +40,18 @@ class Heartbeat {
private final int workerNumber;
private final ConcurrentMap<String, String> payloads;
private final BlockingQueue<PayloadPair> singleUsePayloads = new LinkedBlockingQueue<>();
private final Optional<String> host;

Heartbeat(String jobId, int stageNumber, int workerIndex, int workerNumber) {
this(jobId, stageNumber, workerIndex, workerNumber, Optional.empty());
}

Heartbeat(String jobId, int stageNumber, int workerIndex, int workerNumber, Optional<String> host) {
this.jobId = jobId;
this.stageNumber = stageNumber;
this.workerIndex = workerIndex;
this.workerNumber = workerNumber;
this.host = host;
payloads = new ConcurrentHashMap<>();
}

Expand Down Expand Up @@ -79,6 +87,7 @@ Status getCurrentHeartbeatStatus() {
payloadList.add(new Status.Payload(entry.getKey(), entry.getValue()));
}
Status status = new Status(jobId, stageNumber, workerIndex, workerNumber, Status.TYPE.HEARTBEAT, "heartbeat", MantisJobState.Noop);
host.ifPresent(status::setHostname);
if (!payloadList.isEmpty())
status.setPayloads(payloadList);
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
import rx.Subscription;
import rx.subjects.PublishSubject;


/**
* This class is the executable entry point for the worker. It constructs the related components (LeaderService),
* and starts them.
*/
public class MantisWorker extends BaseService {

private static final Logger logger = LoggerFactory.getLogger(MantisWorker.class);
Expand Down Expand Up @@ -109,7 +112,8 @@ public void start() {
.Factory
.forEphemeralJobsThatNeedToBeKilledInAbsenceOfSubscriber(
gateway,
Clock.systemDefaultZone()));
Clock.systemDefaultZone()),
Optional.empty());
taskStatusUpdateSubscription =
task
.getStatus()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class Task extends AbstractIdleService {

private final PublishSubject<Observable<Status>> tasksStatusSubject = PublishSubject.create();

private final Optional<String> hostname;

@Override
public void startUp() {
try {
Expand All @@ -78,7 +80,9 @@ public void doRun() throws Exception {
vmTaskStatusSubject,
masterMonitor,
config,
workerMetricsClient, sinkSubscriptionStateHandlerFactory),
workerMetricsClient,
sinkSubscriptionStateHandlerFactory,
hostname),
getJobProviderClass(), classLoaderHandle, null));

log.info("Starting Mantis Worker for task {}", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private final TaskExecutorRegistration taskExecutorRegistration;
private final CompletableFuture<Void> startFuture = new CompletableFuture<>();
private final ExecutorService ioExecutor;
private final String hostName;

// the reason the MantisMasterGateway field is not final is because we expect the HighAvailabilityServices
// to be started before we can get the MantisMasterGateway
Expand Down Expand Up @@ -125,9 +126,10 @@ public TaskExecutor(
Hardware.getSizeOfPhysicalMemory(),
Hardware.getSizeOfDisk(),
workerPorts.getNumberOfPorts());
this.hostName = workerConfiguration.getExternalAddress();
this.taskExecutorRegistration =
new TaskExecutorRegistration(
taskExecutorID, clusterID, getAddress(), workerConfiguration.getExternalAddress(), workerPorts, machineDefinition);
taskExecutorID, clusterID, getAddress(), hostName, workerPorts, machineDefinition);
this.ioExecutor =
Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
Expand Down Expand Up @@ -412,7 +414,8 @@ public CompletableFuture<Ack> submitTask(ExecuteStageRequest request) {
workerConfiguration,
masterMonitor,
classLoaderHandle,
subscriptionStateHandlerFactory);
subscriptionStateHandlerFactory,
Optional.of(getHostname()));

setCurrentTask(task);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,15 @@ public class WorkerExecutionOperationsNetworkStage implements WorkerExecutionOpe
private Action0 onSinkUnsubscribe = null;
private final List<Closeable> closeables = new ArrayList<>();
private final ScheduledExecutorService scheduledExecutorService;

public WorkerExecutionOperationsNetworkStage(
Observer<VirtualMachineTaskStatus> vmTaskStatusObserver,
MantisMasterGateway mantisMasterApi, WorkerConfiguration config,
Factory sinkSubscriptionStateHandlerFactory) {
this(vmTaskStatusObserver, mantisMasterApi, config, null, sinkSubscriptionStateHandlerFactory);
}
private final Optional<String> hostname;

public WorkerExecutionOperationsNetworkStage(
Observer<VirtualMachineTaskStatus> vmTaskStatusObserver,
MantisMasterGateway mantisMasterApi,
WorkerConfiguration config,
WorkerMetricsClient workerMetricsClient,
Factory sinkSubscriptionStateHandlerFactory) {
Factory sinkSubscriptionStateHandlerFactory,
Optional<String> hostname) {
this.vmTaskStatusObserver = vmTaskStatusObserver;
this.mantisMasterApi = mantisMasterApi;
this.config = config;
Expand All @@ -137,6 +132,7 @@ public WorkerExecutionOperationsNetworkStage(
ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.locate.spectator.registry", "true");
lookupSpectatorRegistry = Boolean.valueOf(locateSpectatorRegistry);
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
this.hostname = hostname;
}

/**
Expand Down Expand Up @@ -388,7 +384,7 @@ public void executeStage(final ExecutionDetails setup) throws IOException {
rw.setContext(context);
// setup heartbeats
heartbeatRef.set(new Heartbeat(rw.getJobId(),
rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum()));
rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), hostname));
final double networkMbps = executionRequest.getSchedulingInfo().forStage(rw.getStageNum()).getMachineDefinition().getNetworkMbps();
Closeable heartbeatCloseable = startSendingHeartbeats(rw.getJobStatus(),
new WorkerId(executionRequest.getJobId(), executionRequest.getWorkerIndex(),
Expand Down

0 comments on commit 925b330

Please sign in to comment.