Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BMM Restart Improvements Part 2. Followers claim their tokens after handling stop assignments #921

Merged
merged 14 commits into from
Jan 23, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public interface Connector extends MetricsAware, DatastreamChangeListener {
* @throws UnsupportedOperationException if not implemented by Connector classes.
*/
default List<String> getActiveTasks() {
throw new UnsupportedOperationException("Running tasks API is not supported unless implemented by coordinators");
throw new UnsupportedOperationException("Running tasks API is not supported unless implemented by connectors");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,11 +716,11 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx

if (_config.getEnableAssignmentTokens()) {
try {
ExecutorService executor = Executors.newSingleThreadExecutor();
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
// Queue assignment token claim task
_executor.schedule(() -> maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment),
0, TimeUnit.MILLISECONDS);
executor.submit(() -> maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment));
} catch (RejectedExecutionException ex) {
_log.warn("Failed to schedule the task for claiming assignment tokens", ex);
_log.warn("Failed to submit the task for claiming assignment tokens", ex);
}
}

Expand Down Expand Up @@ -828,6 +828,7 @@ void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssign
filter(t -> stoppingStreamNames.contains(t.getTaskPrefix())).
map(DatastreamTask::getId).collect(Collectors.toSet());

// TODO Evaluate whether we need to optimize here and make this call for each datastream
if (PollUtils.poll(() -> connectorTasksHaveStopped(connector, stoppingDatastreamTasks),
vmaheshw marked this conversation as resolved.
Show resolved Hide resolved
_config.getTaskStopCheckRetryPeriodMs(), _config.getTaskStopCheckTimeoutMs())) {
_adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName());
Expand Down