Skip to content

Commit

Permalink
Finished implementation after merging linkedin#921
Browse files Browse the repository at this point in the history
  • Loading branch information
jzakaryan committed Jan 24, 2023
1 parent 63f874e commit 12569da
Showing 1 changed file with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ private void handleLeaderDoAssignment(boolean cleanUpOrphanNodes) {
map(AssignmentToken::getIssuedFor).collect(Collectors.toSet());
_log.error("Stop failed to propagate within {}ms for streams: {}. The following hosts failed to claim their token(s): {}",
_config.getStopPropagationTimeout(), failedStreams, hosts);
revokeUnclaimedAssignmentTokens(unclaimedTokens);
revokeUnclaimedAssignmentTokens(unclaimedTokens, stoppingDatastreamGroups);
_metrics.updateMeter(CoordinatorMetrics.Meter.NUM_FAILED_STOPS, failedStreams.size());
}

Expand Down Expand Up @@ -1401,13 +1401,22 @@ private void handleLeaderDoAssignment(boolean cleanUpOrphanNodes) {
}
}

private void revokeUnclaimedAssignmentTokens(Map<String, List<AssignmentToken>> unclaimedTokens) {
private void revokeUnclaimedAssignmentTokens(Map<String, List<AssignmentToken>> unclaimedTokens,
List<DatastreamGroup> stoppingDatastreamGroups) {
_log.info("Revoking unclaimed tokens");
for (String stream : unclaimedTokens.keySet()) {
List<String> instances = unclaimedTokens.get(stream).stream().map(AssignmentToken::getIssuedFor).
Map<String, Datastream> datastreamMap = new HashMap<>();
stoppingDatastreamGroups.forEach(dg -> dg.getDatastreams().forEach(ds -> datastreamMap.put(ds.getName(), ds)));
for (String streamName : unclaimedTokens.keySet()) {
Datastream stream = datastreamMap.get(streamName);

if (stream == null) {
_log.warn("Failed to claim token for unknown datastream: {}", streamName);
continue;
}

List<String> instances = unclaimedTokens.get(streamName).stream().map(AssignmentToken::getIssuedFor).
collect(Collectors.toList());
// TODO Uncomment after #921
//instances.forEach(i -> _adapter.claimAssignmentTokensForDatastreams(Collections.singletonList(stream), i));
instances.forEach(i -> _adapter.claimAssignmentTokensForDatastreams(Collections.singletonList(stream), i));
}
}

Expand Down

0 comments on commit 12569da

Please sign in to comment.