Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Stopping Tasks On Assignment Change of Tasks #868

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ public abstract class AbstractKafkaConnector implements Connector, DiagnosticsAw
// multiple concurrent threads. If access is required to both maps then the order of synchronization must be
// _runningTasks followed by _tasksToStop to prevent deadlocks.
private final Map<DatastreamTask, ConnectorTaskEntry> _runningTasks = new HashMap<>();
private final Map<DatastreamTask, ConnectorTaskEntry> _tasksToStop = new HashMap<>();

// _tasksPendingStop contains the tasks that are pending stop across various assignment changes. The periodic health
// check call will attempt to stop these tasks until they are not stopped / are stuck somewhere in stop path.
private final Map<DatastreamTask, ConnectorTaskEntry> _tasksPendingStop = new HashMap<>();

// A daemon executor to constantly check whether all tasks are running and restart them if not.
private final ScheduledExecutorService _daemonThreadExecutorService =
Expand Down Expand Up @@ -154,23 +157,25 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {
_logger.info("onAssignmentChange called with tasks {}", tasks);

synchronized (_runningTasks) {
shrinandthakkar marked this conversation as resolved.
Show resolved Hide resolved
Map<DatastreamTask, ConnectorTaskEntry> runningTasksToStop = new HashMap<>();
Set<DatastreamTask> toCancel = new HashSet<>(_runningTasks.keySet());
tasks.forEach(toCancel::remove);

if (toCancel.size() > 0) {
// Mark the connector task as stopped so that, in case stopping the task here fails for any reason in
// restartDeadTasks the task is not restarted
synchronized (_tasksToStop) {
synchronized (_tasksPendingStop) {
toCancel.forEach(task -> {
_tasksToStop.put(task, _runningTasks.get(task));
runningTasksToStop.put(task, _runningTasks.get(task));
_tasksPendingStop.put(task, _runningTasks.get(task));
_runningTasks.remove(task);
});
}
stopUnassignedTasks();
scheduleTasksToStop(runningTasksToStop);
}

boolean toCallRestartDeadTasks = false;
synchronized (_tasksToStop) {
synchronized (_tasksPendingStop) {
for (DatastreamTask task : tasks) {
ConnectorTaskEntry connectorTaskEntry = _runningTasks.get(task);
if (connectorTaskEntry != null) {
Expand All @@ -180,16 +185,17 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {
// This is necessary because DatastreamTaskImpl.hashCode() does not take into account all the
// fields/properties of the DatastreamTask (e.g. dependencies).
_runningTasks.remove(task);
_runningTasks.put(task, connectorTaskEntry);
} else {
if (_tasksToStop.containsKey(task)) {
// If a pending stop task is reassigned to this host, we'd have to ensure to restart the
// task or replace the connectorTaskEntry for that task in the restartDeadTasks function.
if (_tasksPendingStop.containsKey(task)) {
shrinandthakkar marked this conversation as resolved.
Show resolved Hide resolved
toCallRestartDeadTasks = true;
connectorTaskEntry = _tasksToStop.remove(task);
connectorTaskEntry = _tasksPendingStop.remove(task);
} else {
connectorTaskEntry = createKafkaConnectorTask(task);
}
_runningTasks.put(task, connectorTaskEntry);
}
_runningTasks.put(task, connectorTaskEntry);
}
}
// If any tasks pending stop were re-assigned to this host we explicitly call restartDeadTasks to ensure
Expand Down Expand Up @@ -293,8 +299,8 @@ protected void restartDeadTasks() {
* Returns the number of tasks yet to be stopped.
*/
int getTasksToStopCount() {
synchronized (_tasksToStop) {
return _tasksToStop.size();
synchronized (_tasksPendingStop) {
return _tasksPendingStop.size();
}
}

Expand All @@ -308,44 +314,50 @@ int getRunningTasksCount() {
}

/**
* Attempt to stop the unassigned tasks.
* Attempt to stop the unassigned tasks from the _tasksToStop map.
*/
private void stopUnassignedTasks() {
synchronized (_tasksToStop) {
if (_tasksToStop.size() == 0) {
_logger.info("No tasks to stop");
return;
}
scheduleTasksToStop(_tasksPendingStop);
}

// Spawn a separate thread to attempt stopping the connectorTask. The connectorTask will be canceled if it
// does not stop within a certain amount of time. This will force cleanup of connectorTasks which take too long
// to stop, or are stuck indefinitely. A separate thread is spawned to handle this because the Coordinator
// requires that this step completely because we call this from onAssignmentChange() (assignment thread gets
// killed if it takes too long) and restartDeadTasks which must complete quickly.
List<Future<DatastreamTask>> stopTaskFutures = _tasksToStop.keySet().stream()
.map(task -> asyncStopTask(task, _tasksToStop.get(task)))
.collect(Collectors.toList());
/**
* Attempt to stop the unassigned tasks from the argument map.
*/
private void scheduleTasksToStop(Map<DatastreamTask, ConnectorTaskEntry> tasks) {
if (tasks.size() == 0) {
_logger.info("No tasks to stop");
return;
}

_shutdownExecutorService.submit(() -> {
List<DatastreamTask> toRemoveTasks = stopTaskFutures.stream().map(stopTaskFuture -> {
try {
return stopTaskFuture.get(CANCEL_TASK_TIMEOUT.plus(POST_CANCEL_TASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
_logger.warn("Stop task future failed with exception", e);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());

if (toRemoveTasks.size() > 0) {
synchronized (_tasksToStop) {
// Its possible that while stopping the task was pending there was another onAssignmentChange event
// which reassigned the task back to this host and the task was moved back to _runningTasks. In this
// case the remove operation here will be a no-op.
toRemoveTasks.forEach(_tasksToStop::remove);
}
// Spawn a separate thread to attempt stopping the connectorTask. The connectorTask will be canceled if it
// does not stop within a certain amount of time. This will force cleanup of connectorTasks which take too long
// to stop, or are stuck indefinitely. A separate thread is spawned to handle this because the Coordinator
// requires that this step completely because we call this from onAssignmentChange() (assignment thread gets
// killed if it takes too long) and restartDeadTasks which must complete quickly.
List<Future<DatastreamTask>> stopTaskFutures = tasks.keySet().stream()
.map(task -> asyncStopTask(task, tasks.get(task)))
.collect(Collectors.toList());

_shutdownExecutorService.submit(() -> {
List<DatastreamTask> toRemoveTasks = stopTaskFutures.stream().map(stopTaskFuture -> {
try {
return stopTaskFuture.get(CANCEL_TASK_TIMEOUT.plus(POST_CANCEL_TASK_TIMEOUT).toMillis(),
TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
_logger.warn("Stop task future failed with exception", e);
}
});
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());

if (toRemoveTasks.size() > 0) {
synchronized (_tasksPendingStop) {
// Its possible that while stopping the task was pending there was another onAssignmentChange event
// which reassigned the task back to this host and the task was moved back to _runningTasks. In this
// case the remove operation here will be a no-op.
toRemoveTasks.forEach(_tasksPendingStop::remove);
}
}
});
}

@NotNull
Expand Down Expand Up @@ -420,10 +432,10 @@ public void stop() {
_runningTasks.forEach(this::asyncStopTask);
_runningTasks.clear();
}
synchronized (_tasksToStop) {
synchronized (_tasksPendingStop) {
// Try to stop the tasks
_tasksToStop.forEach(this::asyncStopTask);
_tasksToStop.clear();
_tasksPendingStop.forEach(this::asyncStopTask);
_tasksPendingStop.clear();
}
_logger.info("Start to shut down the shutdown executor and wait up to {} ms.",
SHUTDOWN_EXECUTOR_SHUTDOWN_TIMEOUT.toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.slf4j.Logger;
Expand Down Expand Up @@ -138,6 +142,51 @@ public void testOnAssignmentChangeStopTaskFailure() {
connector.stop();
}

@Test
public void testOnAssignmentChangeMultipleReassignments() throws InterruptedException {
Properties props = new Properties();
// Reduce time interval between calls to restartDeadTasks to force invocation of stopTasks
props.setProperty("daemonThreadIntervalInSeconds", "2");
// With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured
// to fail the first time with InterruptedException and pass the second time.
TestKafkaConnector connector = new TestKafkaConnector(false, props, true);

// first task assignment assigns task 1
List<DatastreamTask> firstTaskAssignment = getTaskListInRange(1, 2);
connector.onAssignmentChange(firstTaskAssignment);
connector.start(null);
Assert.assertEquals(connector.getRunningTasksCount(), 1);

// second task assignment assigns task 2,3,4,5 and takes out task 1
List<DatastreamTask> secondTaskAssignment = getTaskListInRange(2, 6);

// during the assignment, the _taskToStop map count need to be less than 1, as only task 1 would be taken out.
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> connector.onAssignmentChange(secondTaskAssignment));
executor.execute(() -> Assert.assertTrue(connector.getTasksToStopCount() <= 1));

awaitForExecution(executor, 50L);
Assert.assertTrue(connector.getTasksToStopCount() >= 1); // the count of the _taskToStopTracker
Assert.assertEquals(connector.getRunningTasksCount(), 4);

// second task assignment keeps task 5, assigns task 6,7,8 and takes out task 2,3,4
List<DatastreamTask> thirdTaskAssignment = getTaskListInRange(5, 9);

// during the assignment, the _taskToStop map count need to be less than 4, as task 2,3,4 would be taken out and task 1 if not already stopped.
executor = Executors.newFixedThreadPool(2);
executor.execute(() -> connector.onAssignmentChange(thirdTaskAssignment));
executor.execute(() -> Assert.assertTrue(connector.getTasksToStopCount() <= 4));

awaitForExecution(executor, 50L);
Assert.assertTrue(connector.getTasksToStopCount() >= 3); // the count of the _taskToStopTracker

// Wait for restartDeadTasks to be called to attempt another stopTasks call
PollUtils.poll(() -> connector.getCreateTaskCalled() >= 3, Duration.ofSeconds(1).toMillis(),
Duration.ofSeconds(10).toMillis());
Assert.assertEquals(connector.getRunningTasksCount(), 4);
connector.stop();
}

@Test
public void testCalculateThreadStartDelay() {
Properties props = new Properties();
Expand Down Expand Up @@ -191,6 +240,26 @@ public void testRestartThrowsException() {
connector.stop();
}

// helper method to generate the tasks in a range for assignment
private List<DatastreamTask> getTaskListInRange(int start, int end) {
List<DatastreamTask> taskAssignmentList = new ArrayList<>();
IntStream.range(start, end).forEach(index -> {
DatastreamTaskImpl dt = new DatastreamTaskImpl();
dt.setTaskPrefix("testtask" + index);
taskAssignmentList.add(dt);
});
return taskAssignmentList;
}

// helper method to await on the executor for the given timeout period
private void awaitForExecution(ExecutorService executor, Long timeUnitMs) throws InterruptedException {
try {
executor.awaitTermination(timeUnitMs, TimeUnit.MILLISECONDS);
} finally {
executor.shutdownNow();
}
}

/**
* Dummy implementation of {@link AbstractKafkaConnector} for testing purposes
*/
Expand Down