Skip to content

Commit

Permalink
Datastream stop transition redesign
Browse files Browse the repository at this point in the history
  • Loading branch information
Shrinand Thakkar committed Aug 6, 2021
1 parent 457ac60 commit 6c142db
Show file tree
Hide file tree
Showing 7 changed files with 400 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
"name" : "force",
"type" : "boolean",
"default" : "false",
"doc" : "whether or not to resume all datastreams within the given datastream's group"
"doc" : "whether or not to stop all datastreams within the given datastream's group"
} ]
} ]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ record Datastream {
PAUSED
DELETING
STOPPED
STOPPING
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"type" : {
"type" : "enum",
"name" : "DatastreamStatus",
"symbols" : [ "INITIALIZING", "READY", "PAUSED", "DELETING", "STOPPED" ]
"symbols" : [ "INITIALIZING", "READY", "PAUSED", "DELETING", "STOPPED", "STOPPING" ]
},
"doc" : "Status of the datastream",
"symbolDocs" : {
Expand Down Expand Up @@ -187,7 +187,7 @@
"name" : "force",
"type" : "boolean",
"default" : "false",
"doc" : "whether or not to resume all datastreams within the given datastream's group"
"doc" : "whether or not to stop all datastreams within the given datastream's group"
} ]
} ]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class DatastreamServer {
private final Map<String, String> _bootstrapConnectors;

private Coordinator _coordinator;
private Properties _properties;
private DatastreamStore _datastreamStore;
private DatastreamJettyStandaloneLauncher _jettyLauncher;
private JmxReporter _jmxReporter;
Expand Down Expand Up @@ -130,7 +131,8 @@ public class DatastreamServer {
public DatastreamServer(Properties properties) throws DatastreamException {
LOG.info("Start to initialize DatastreamServer. Properties: " + properties);
LOG.info("Creating coordinator.");
VerifiableProperties verifiableProperties = new VerifiableProperties(properties);
_properties = properties;
VerifiableProperties verifiableProperties = new VerifiableProperties(_properties);

HashSet<String> connectorTypes = new HashSet<>(verifiableProperties.getStringList(CONFIG_CONNECTOR_NAMES,
Collections.emptyList()));
Expand All @@ -148,7 +150,7 @@ public DatastreamServer(Properties properties) throws DatastreamException {
throw new DatastreamRuntimeException(errorMessage);
}

CoordinatorConfig coordinatorConfig = new CoordinatorConfig(properties);
CoordinatorConfig coordinatorConfig = new CoordinatorConfig(_properties);

LOG.info("Setting up DMS endpoint server.");
ZkClient zkClient = new ZkClient(coordinatorConfig.getZkAddress(), coordinatorConfig.getZkSessionTimeout(),
Expand Down Expand Up @@ -226,6 +228,10 @@ public DatastreamStore getDatastreamStore() {
return _datastreamStore;
}

public Properties getProperties() {
return _properties;
}

public int getHttpPort() {
return _httpPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
Expand All @@ -37,6 +38,7 @@
import com.linkedin.datastream.common.DatastreamStatus;
import com.linkedin.datastream.common.DatastreamUtils;
import com.linkedin.datastream.common.JsonUtils;
import com.linkedin.datastream.common.PollUtils;
import com.linkedin.datastream.common.RestliUtils;
import com.linkedin.datastream.metrics.BrooklinGaugeInfo;
import com.linkedin.datastream.metrics.BrooklinMeterInfo;
Expand All @@ -57,6 +59,7 @@
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.PathKeys;
import com.linkedin.restli.server.ResourceLevel;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.Action;
import com.linkedin.restli.server.annotations.ActionParam;
Expand All @@ -70,7 +73,6 @@

import static com.linkedin.datastream.common.DatastreamMetadataConstants.NUM_TASKS;


/**
* Resources classes are used by rest.li to process corresponding HTTP request.
* Note that rest.li will instantiate an object each time it processes a request.
Expand All @@ -97,12 +99,20 @@ public class DatastreamResources extends CollectionResourceTemplate<String, Data
private static final String CREATE_CALL_LATENCY_MS_STRING = "createCallLatencyMs";
private static final String DELETE_CALL_LATENCY_MS_STRING = "deleteCallLatencyMs";

// To support retries on the request timeouts
public static final String CONFIG_STOP_TRANSITION_TIMEOUT_MS = "stopTransitionTimeoutMs";
public static final String CONFIG_STOP_TRANSITION_RETRY_PERIOD_MS = "stopTransitionRetryPeriodMs";
private static final String STOP_TRANSITION_TIMEOUT_MS_DEFAULT = Duration.ofMillis(60000).toString();
private static final String STOP_TRANSITION_RETRY_PERIOD_MS_DEFAULT = Duration.ofMillis(1000).toString();

private final DatastreamStore _store;
private final Coordinator _coordinator;
private final ErrorLogger _errorLogger;

private final DynamicMetricsManager _dynamicMetricsManager;

private final Duration _stopTransitionTimeoutMs;
private final Duration _stopTransitionRetryPeriodMs;

/**
* Constructor for DatastreamResources
* @param datastreamServer the datastream server
Expand All @@ -117,13 +127,44 @@ public DatastreamResources(DatastreamServer datastreamServer) {
* @param coordinator the server coordinator
*/
public DatastreamResources(DatastreamStore store, Coordinator coordinator) {
this(store, coordinator, null);
}

/**
* Constructor for DatastreamResources
* @param store the datastream store
* @param coordinator the server coordinator
* @param properties the properties of datastream server
*/
public DatastreamResources(DatastreamStore store, Coordinator coordinator, Properties properties) {
_store = store;
_coordinator = coordinator;
_errorLogger = new ErrorLogger(LOG, _coordinator.getInstanceName());

_dynamicMetricsManager = DynamicMetricsManager.getInstance();
_dynamicMetricsManager.registerGauge(CLASS_NAME, CREATE_CALL_LATENCY_MS_STRING, CREATE_CALL_LATENCY_MS_SUPPLIER);
_dynamicMetricsManager.registerGauge(CLASS_NAME, DELETE_CALL_LATENCY_MS_STRING, DELETE_CALL_LATENCY_MS_SUPPLIER);

long stopTransitionTimeoutMs = Long.parseLong(
Objects.nonNull(properties) ? (String) properties.getOrDefault(CONFIG_STOP_TRANSITION_TIMEOUT_MS,
STOP_TRANSITION_TIMEOUT_MS_DEFAULT) : STOP_TRANSITION_TIMEOUT_MS_DEFAULT);

long stopTransitionRetryPeriodMs = Long.parseLong(
Objects.nonNull(properties) ? (String) properties.getOrDefault(CONFIG_STOP_TRANSITION_RETRY_PERIOD_MS,
STOP_TRANSITION_RETRY_PERIOD_MS_DEFAULT) : STOP_TRANSITION_RETRY_PERIOD_MS_DEFAULT);

if (stopTransitionRetryPeriodMs > 0 && stopTransitionTimeoutMs > stopTransitionRetryPeriodMs) {
_stopTransitionTimeoutMs = Duration.ofMillis(stopTransitionTimeoutMs);
_stopTransitionRetryPeriodMs = Duration.ofMillis(stopTransitionRetryPeriodMs);
} else {
LOG.warn("Illegal configurations provided, stopTransitionTimeoutMs={} stopTransitionRetryPeriodMs={}. Falling back to using default configurations",
stopTransitionTimeoutMs, stopTransitionRetryPeriodMs);
_stopTransitionTimeoutMs = Duration.ofMillis(Long.parseLong(STOP_TRANSITION_TIMEOUT_MS_DEFAULT));
_stopTransitionRetryPeriodMs = Duration.ofMillis(Long.parseLong(STOP_TRANSITION_RETRY_PERIOD_MS_DEFAULT));
}

LOG.info("Datastream resources, created with stopTransitionRetryPeriodMs={} stopTransitionTimeoutMs={}",
_stopTransitionRetryPeriodMs, _stopTransitionTimeoutMs);
}

/**
Expand Down Expand Up @@ -388,7 +429,7 @@ public ActionResult<Void> movePartitions(@PathKeysParam PathKeys pathKeys,
/**
* Stop a datastream
* @param pathKeys resource key containing the datastream name
* @param force whether or not to resume all datastreams within the given datastream's group
* @param force whether or not to stop all datastreams within the given datastream's group
* @return result HTTP status
*/
@Action(name = "stop", resourceLevel = ResourceLevel.ENTITY)
Expand All @@ -404,30 +445,49 @@ public ActionResult<Void> stop(@PathKeysParam PathKeys pathKeys,
"Datastream to stopped does not exist: " + datastreamName);
}

if (!DatastreamStatus.READY.equals(datastream.getStatus()) && !DatastreamStatus.PAUSED.equals(datastream.getStatus())) {
if (DatastreamStatus.STOPPED.equals(datastream.getStatus())) {
LOG.info("Datastream {} is already in STOPPED state", datastreamName);
return new ActionResult<>(HttpStatus.S_200_OK);
}

if (!DatastreamStatus.READY.equals(datastream.getStatus()) && !DatastreamStatus.PAUSED.equals(
datastream.getStatus()) && !DatastreamStatus.STOPPING.equals(datastream.getStatus())) {
_errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_405_METHOD_NOT_ALLOWED,
"Can only pause a datastream in READY/PAUSED state: " + datastreamName);
String.format("Datastream %s is in %s state. Can only stop a datastream in READY/PAUSED state", datastreamName, datastream.getStatus()));
}

List<Datastream> datastreamsToStop =
force ? getGroupedDatastreams(datastream) : Collections.singletonList(datastream);
LOG.info("Stop datastreams {}", datastreamsToStop);
for (Datastream d : datastreamsToStop) {
try {
if (DatastreamStatus.READY.equals(datastream.getStatus()) || DatastreamStatus.PAUSED.equals(datastream.getStatus())) {
d.setStatus(DatastreamStatus.STOPPED);
if (DatastreamStatus.READY.equals(d.getStatus()) || DatastreamStatus.PAUSED.equals(d.getStatus())) {
d.setStatus(DatastreamStatus.STOPPING);
_store.updateDatastream(d.getName(), d, true);
_store.deleteDatastreamNumTasks(d.getName());
} else if (DatastreamStatus.STOPPING.equals(d.getStatus())) {
// this check helps in preventing any datastream from being stuck in STOPPING state indefinitely
LOG.warn("Datastream {} is already in {} state. Notifying leader to initiate transition", d,
d.getStatus());
_store.updateDatastream(d.getName(), d, true);
_store.deleteDatastreamNumTasks(d.getName());
} else {
LOG.warn("Cannot stop datastream {}, as it is not in READY/PAUSED state. State: {}", d, datastream.getStatus());
LOG.warn("Cannot stop datastream {}, as it is not in READY/PAUSED state. State: {}", d, d.getStatus());
}
} catch (DatastreamException e) {
_errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
"Could not update datastream to STOPPED state: " + d.getName(), e);
"Could not update datastream to STOPPING state: " + d.getName(), e);
}
}

LOG.info("Completed request for stopping datastream {}", datastream);
// polls until the leader transitions the state of the datastream to STOPPED state
PollUtils.poll(() -> datastreamsToStop.stream()
.allMatch(ds -> _store.getDatastream(ds.getName()).getStatus().equals(DatastreamStatus.STOPPED)),
allStopped -> allStopped, _stopTransitionRetryPeriodMs.toMillis(), _stopTransitionTimeoutMs.toMillis())
.orElseThrow(() -> new RestLiServiceException(HttpStatus.S_408_REQUEST_TIMEOUT,
String.format("Stop request timed out for datastream: %s", datastreamName)));

LOG.info("Completed request for stopping datastream {}", _store.getDatastream(datastream.getName()));

return new ActionResult<>(HttpStatus.S_200_OK);
}
Expand Down
Loading

0 comments on commit 6c142db

Please sign in to comment.