Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BMM Restart Improvements Part 2. Followers claim their tokens after handling stop assignments #921

Merged
merged 14 commits into from
Jan 23, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -788,4 +788,21 @@ public Map<String, Map<Integer, Long>> getConsumptionLagForTask(DatastreamTask t
return taskEntry.getConnectorTask().getKafkaTopicPartitionTracker().getConsumptionLag();
}
}

/**
* {@inheritDoc}
*/
public List<String> getActiveTasks() {
List<DatastreamTask> tasks = new ArrayList<>();

synchronized (_runningTasks) {
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
synchronized (_tasksPendingStop) {
tasks.addAll(_runningTasks.keySet());
tasks.addAll(_tasksPendingStop.keySet());
}
}

_logger.debug("Found {} active tasks assigned to the connector", tasks.size());
return tasks.stream().map(DatastreamTask::getId).collect(Collectors.toList());
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ public interface Connector extends MetricsAware, DatastreamChangeListener {
*/
void initializeDatastream(Datastream stream, List<Datastream> allDatastreams) throws DatastreamValidationException;

/**
* Returns a list with IDs for tasks that are active (i.e. running or pending stop)
* @return A list with task IDs that are currently running on the coordinator
* @throws UnsupportedOperationException if not implemented by Connector classes.
*/
default List<String> getActiveTasks() {
throw new UnsupportedOperationException("Running tasks API is not supported unless implemented by connectors");
}

/**
* Validate the update datastreams operation. By default this is not supported. Any connectors that want to support
* datastream updates should override this method to perform the validation needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -30,6 +31,7 @@

import org.apache.commons.lang3.Validate;
import org.jetbrains.annotations.NotNull;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.mockito.invocation.Invocation;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,6 +92,8 @@
import static com.linkedin.datastream.common.DatastreamMetadataConstants.SYSTEM_DESTINATION_PREFIX;
import static com.linkedin.datastream.common.DatastreamMetadataConstants.TTL_MS;
import static com.linkedin.datastream.server.assignment.StickyMulticastStrategyFactory.DEFAULT_IMBALANCE_THRESHOLD;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyObject;
Expand Down Expand Up @@ -3114,6 +3118,187 @@ void testOnSessionExpired(boolean handleNewSession) throws DatastreamException,
instance1.getDatastreamCache().getZkclient().close();
}

@Test
public void testInferStoppingStreamsFromAssignment() {
String connectorType = "testConnector";
Datastream ketchupStream = DatastreamTestUtils.createDatastream(connectorType, "ketchupStream", "random");
Datastream mayoStream = DatastreamTestUtils.createDatastream(connectorType, "mayoStream", "random");
Datastream mustardStream = DatastreamTestUtils.createDatastream(connectorType, "mustardStream", "random");

DatastreamTaskImpl task1 = new DatastreamTaskImpl();
task1.setTaskPrefix(ketchupStream.getName());
task1.setDatastreams(Collections.singletonList(ketchupStream));
DatastreamTaskImpl task2 = new DatastreamTaskImpl();
task2.setTaskPrefix(mustardStream.getName());
task2.setDatastreams(Collections.singletonList(mustardStream));
List<DatastreamTask> newAssignment = Arrays.asList(task1, task2);

DatastreamTaskImpl task3 = new DatastreamTaskImpl();
task3.setTaskPrefix(mayoStream.getName());
task3.setDatastreams(Collections.singletonList(mayoStream));
List<DatastreamTask> removedTasks = Collections.singletonList(task3);

List<Datastream> stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
Assert.assertEquals(stoppingStreams.size(), 1);
Assert.assertEquals(stoppingStreams.get(0), mayoStream);

removedTasks = Arrays.asList(task2, task3);
stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
Assert.assertEquals(stoppingStreams.size(), 1);
Assert.assertEquals(stoppingStreams.get(0), mayoStream);

newAssignment = Arrays.asList(task1, task2, task3);
stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
Assert.assertEquals(stoppingStreams.size(), 0);
}

@Test
public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
String testCluster = "testClaimAssignmentTokensForStoppingStreams";
String pizzaConnectorName = "pizzaConnector";
String thinCrustStream = "thinCrustStream";
String deepDishStream = "deepDishStream";

String bagelConnectorName = "bagelConnector";
String eggBagelStream = "eggBagelStream";

Properties props = new Properties();
props.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Coordinator coordinator = new TestCoordinatorWithSpyZkAdapter(_cachedDatastreamReader, props);
Connector pizzaConnector = new TestHookConnector(pizzaConnectorName, pizzaConnectorName);
Connector bagelConnector = new TestHookConnector(bagelConnectorName, bagelConnectorName);
coordinator.addConnector(pizzaConnectorName, pizzaConnector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
coordinator.addConnector(bagelConnectorName, bagelConnector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
coordinator.start();

Datastream[] pizzaStreams = DatastreamTestUtils.createAndStoreDatastreams(
zkClient, testCluster, pizzaConnectorName, thinCrustStream, deepDishStream);
Datastream[] bagelStreams = DatastreamTestUtils.createAndStoreDatastreams(zkClient,
testCluster, bagelConnectorName, eggBagelStream);

DatastreamTaskImpl pizzaTask1 = new DatastreamTaskImpl();
pizzaTask1.setConnectorType(pizzaConnectorName);
pizzaTask1.setTaskPrefix(thinCrustStream);
pizzaTask1.setDatastreams(Collections.singletonList(pizzaStreams[0]));

DatastreamTaskImpl pizzaTask2 = new DatastreamTaskImpl();
pizzaTask2.setConnectorType(pizzaConnectorName);
pizzaTask2.setTaskPrefix(deepDishStream);
pizzaTask2.setDatastreams(Collections.singletonList(pizzaStreams[1]));

DatastreamTaskImpl bagelTask1 = new DatastreamTaskImpl();
bagelTask1.setConnectorType(bagelConnectorName);
bagelTask1.setTaskPrefix(eggBagelStream);
bagelTask1.setDatastreams(Collections.singletonList(bagelStreams[0]));

List<DatastreamTask> oldAssignment = new ArrayList<>(Arrays.asList(pizzaTask1, pizzaTask2, bagelTask1));
List<DatastreamTask> newAssignment = new ArrayList<>(Collections.singletonList(pizzaTask1));

ZkAdapter spyZkAdapter = coordinator.getZkAdapter();
coordinator.maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment);

// Verify that claim assignment tokens is called twice:
// (1) For deepDishStream, since it no longer has tasks in the new assignment
// (2) For eggBagelStream, since the bagel connector has no tasks in the new assignment (connector stopped case)
CollectionContainsMatcher<Datastream> deepDishStreamMatcher = new CollectionContainsMatcher<>(pizzaStreams[1]);
CollectionContainsMatcher<Datastream> eggBagelStreamMatcher = new CollectionContainsMatcher<>(bagelStreams[0]);
verify(spyZkAdapter, times(2)).claimAssignmentTokensForDatastreams(any(), any());
verify(spyZkAdapter, times(1)).
claimAssignmentTokensForDatastreams(argThat(deepDishStreamMatcher), any());
verify(spyZkAdapter, times(1)).
claimAssignmentTokensForDatastreams(argThat(eggBagelStreamMatcher), any());

zkClient.close();
coordinator.stop();
}

@Test
public void testTokensNotClaimedForConnectorThatFailedToStop() throws Exception {
String testCluster = "testTokensNotClaimedForConnectorThatFailedToStop";
String connectorName = "mockConnector";
String streamName = "someStream";

Properties props = new Properties();
props.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_TIMEOUT_MS, "100");
props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_RETRY_PERIOD_MS, "10");

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Coordinator coordinator = new TestCoordinatorWithSpyZkAdapter(_cachedDatastreamReader, props);
Connector mockConnector = Mockito.mock(Connector.class);
coordinator.addConnector(connectorName, mockConnector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
coordinator.start();

Datastream testStream = DatastreamTestUtils.
createAndStoreDatastreams(zkClient, testCluster, connectorName, streamName)[0];
DatastreamTaskImpl task1 = new DatastreamTaskImpl();
task1.setConnectorType(connectorName);
task1.setTaskPrefix(streamName);
task1.setDatastreams(Collections.singletonList(testStream));

List<DatastreamTask> oldAssignment = new ArrayList<>(Collections.singletonList(task1));
List<DatastreamTask> newAssignment = Collections.emptyList();

// task1 never stops
when(mockConnector.getActiveTasks()).thenReturn(Collections.singletonList(task1.getId()));
coordinator.maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment);
ZkAdapter spyZkAdapter = coordinator.getZkAdapter();

// Verify that:
// (1) Active tasks are queried from the connector at least once
// (2) No tokens are claimed for the stream since the task never stops
verify(mockConnector, Mockito.atLeast(1)).getActiveTasks();
verify(spyZkAdapter, times(0)).claimAssignmentTokensForDatastreams(any(), any());
}

@Test
public void testConnectorTasksHaveStopped() throws Exception {
String testCluster = "testConnectorTasksHaveStopped";
String connectorName = "connector";
String streamName = "stream";
Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster);
Connector connector = Mockito.mock(Connector.class);
coordinator.addConnector(connectorName, connector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);

List<DatastreamTaskImpl> allTasks = new ArrayList<>(Arrays.asList(
new DatastreamTaskImpl(),
new DatastreamTaskImpl(),
new DatastreamTaskImpl(),
new DatastreamTaskImpl()));
allTasks.forEach(t -> t.setTaskPrefix(streamName));
allTasks.forEach(t -> t.setConnectorType(connectorName));
allTasks.forEach(t -> t.setId(UUID.randomUUID().toString())); // setting unique IDs for tasks
Set<String> allTaskIds = allTasks.stream().map(DatastreamTaskImpl::getId).collect(Collectors.toSet());

when(connector.getActiveTasks()).thenReturn(Collections.emptyList());
Assert.assertTrue(coordinator.connectorTasksHaveStopped(connectorName, allTaskIds));

when(connector.getActiveTasks()).thenReturn(new ArrayList<>(allTaskIds));
Assert.assertFalse(coordinator.connectorTasksHaveStopped(connectorName, allTaskIds));
Assert.assertFalse(coordinator.connectorTasksHaveStopped(connectorName,
new HashSet<>(Collections.singletonList(allTasks.get(0).getId()))));
Assert.assertTrue(coordinator.connectorTasksHaveStopped(connectorName, Collections.emptySet()));

when(connector.getActiveTasks()).thenReturn(Collections.singletonList(allTasks.get(0).getId()));
Assert.assertFalse(coordinator.connectorTasksHaveStopped(connectorName, allTaskIds));
Assert.assertTrue(coordinator.connectorTasksHaveStopped(connectorName,
new HashSet<>(Collections.singletonList(allTasks.get(1).getId()))));
}

// helper method: assert that within a timeout value, the connector are assigned the specific
// tasks with the specified names.
private void assertConnectorAssignment(TestHookConnector connector, long timeoutMs, String... datastreamNames)
Expand Down Expand Up @@ -3158,6 +3343,25 @@ private void deleteLiveInstanceNode(ZkClient zkClient, String cluster, Coordinat
zkClient.deleteRecursive(path);
}

static class CollectionContainsMatcher<T> extends ArgumentMatcher<List<T>> {
private final T _element;

public CollectionContainsMatcher(T element) {
_element = element;
}

@Override
@SuppressWarnings("unchecked")
public boolean matches(Object argument) {
if (!(argument instanceof List)) {
return false;
}

List<T> argumentAsList = (List<T>) argument;
return argumentAsList.contains(_element);
}
}

/**
* Base class of test Connector implementations
*/
Expand Down Expand Up @@ -3256,6 +3460,11 @@ public String toString() {
public List<BrooklinMetricInfo> getMetricInfos() {
return null;
}

@Override
public List<String> getActiveTasks() {
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
return Collections.emptyList();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package com.linkedin.datastream.server;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import com.linkedin.datastream.common.JsonUtils;


Expand All @@ -13,24 +16,20 @@
* they handled assignment change
*/
public final class AssignmentToken {
private String _issuedBy;
private String _issuedFor;
private long _timestamp;
private final String _issuedBy;
private final String _issuedFor;
private final long _timestamp;

/**
* Constructor for {@link AssignmentToken}
*/
public AssignmentToken(String issuedBy, String issuedFor) {
@JsonCreator
public AssignmentToken(@JsonProperty("issuedBy") String issuedBy,
@JsonProperty("issuedFor") String issuedFor,
@JsonProperty("timestamp") long timestamp) {
_issuedBy = issuedBy;
_issuedFor = issuedFor;
_timestamp = System.currentTimeMillis();
}

/**
* Default constructor for {@link AssignmentToken}, required for json ser/de
*/
public AssignmentToken() {

_timestamp = timestamp;
}

/**
Expand All @@ -50,42 +49,24 @@ public String toJson() {
/**
* Gets the name of the leader host that issued the token
*/
@JsonProperty("issuedBy")
public String getIssuedBy() {
return _issuedBy;
}

/**
* Gets the name of the host for which the token was issued
*/
@JsonProperty("issuedFor")
public String getIssuedFor() {
return _issuedFor;
}

/**
* Gets the timestamp (in UNIX epoch format) for when the token was issued
*/
@JsonProperty("timestamp")
public long getTimestamp() {
return _timestamp;
}

/**
* Sets the name of the leader host that issued the token
*/
public void setIssuedBy(String issuedBy) {
_issuedBy = issuedBy;
}

/**
* Sets the name of the host for which the token was issued
*/
public void setIssuedFor(String issuedFor) {
_issuedFor = issuedFor;
}

/**
* Sets the timestamp (in UNIX epoch format) for when the token was issued
*/
public void setTimestamp(long timestamp) {
_timestamp = timestamp;
}
}
Loading