Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored StickyPartitionAssignmentStrategy and implemented task estimation logic in LoadBasedPartitionAssignmentStrategy #835

Merged
merged 12 commits into from
Jun 17, 2021
Next Next commit
Removed DatastreamSourceClusterResolver and added a method in Partiti…
…onThroughput provider instead
jzakaryan committed Jun 9, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit b61285473357405073e3907452d28f0132a5d4dc

This file was deleted.

Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
import java.util.Map;

import com.linkedin.datastream.server.ClusterThroughputInfo;
import com.linkedin.datastream.server.DatastreamGroup;


/**
@@ -23,6 +24,13 @@ public interface PartitionThroughputProvider {
*/
ClusterThroughputInfo getThroughputInfo(String clusterName);

/**
* Retrieves per-partition throughput information for the given datastream group
* @param datastreamGroup Datastream group
* @return Throughput information for the provided datastream group
*/
ClusterThroughputInfo getThroughputInfo(DatastreamGroup datastreamGroup);

/**
* Retrieves per-partition throughput information for all clusters
* @return A map, where keys are cluster names and values are throughput information for the cluster

This file was deleted.

Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@
import com.linkedin.datastream.server.ClusterThroughputInfo;
import com.linkedin.datastream.server.DatastreamGroup;
import com.linkedin.datastream.server.DatastreamGroupPartitionsMetadata;
import com.linkedin.datastream.server.DatastreamSourceClusterResolver;
import com.linkedin.datastream.server.DatastreamTask;
import com.linkedin.datastream.server.Pair;
import com.linkedin.datastream.server.providers.PartitionThroughputProvider;
@@ -45,7 +44,6 @@ public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignm
private static final int TASK_CAPACITY_UTILIZATION_PCT_DEFAULT = 90;

private final PartitionThroughputProvider _throughputProvider;
private final DatastreamSourceClusterResolver _sourceClusterResolver;
private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;

@@ -54,15 +52,13 @@ public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignm
* Creates an instance of {@link LoadBasedPartitionAssignmentStrategy}
*/
public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughputProvider,
DatastreamSourceClusterResolver sourceClusterResolver, Optional<Integer> maxTasks,
Optional<Integer> imbalanceThreshold, Optional<Integer> maxPartitionPerTask, boolean enableElasticTaskAssignment,
Optional<Integer> partitionsPerTask, Optional<Integer> partitionFullnessFactorPct,
Optional<Integer> taskCapacityMBps, Optional<Integer> taskCapacityUtilizationPct, Optional<ZkClient> zkClient,
String clusterName) {
Optional<Integer> maxTasks, Optional<Integer> imbalanceThreshold, Optional<Integer> maxPartitionPerTask,
boolean enableElasticTaskAssignment, Optional<Integer> partitionsPerTask,
Optional<Integer> partitionFullnessFactorPct, Optional<Integer> taskCapacityMBps,
Optional<Integer> taskCapacityUtilizationPct, Optional<ZkClient> zkClient, String clusterName) {
super(maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
partitionFullnessFactorPct, zkClient, clusterName);
_throughputProvider = throughputProvider;
_sourceClusterResolver = sourceClusterResolver;
_taskCapacityMBps = taskCapacityMBps.orElse(TASK_CAPACITY_MBPS_DEFAULT);
_taskCapacityUtilizationPct = taskCapacityUtilizationPct.orElse(TASK_CAPACITY_UTILIZATION_PCT_DEFAULT);
}
@@ -95,8 +91,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<Datastr
"Zero tasks assigned. Retry leader partition assignment.");

// Resolving cluster name from datastream group
String clusterName = _sourceClusterResolver.getSourceCluster(datastreamPartitions.getDatastreamGroup());
ClusterThroughputInfo clusterThroughputInfo = partitionThroughputInfo.get(clusterName);
ClusterThroughputInfo clusterThroughputInfo = partitionThroughputInfo.get(null);

// TODO Get task count estimate and perform elastic task count validation
// TODO Get task count estimate based on throughput and pick a winner
Original file line number Diff line number Diff line change
@@ -12,8 +12,6 @@
import org.slf4j.LoggerFactory;

import com.linkedin.datastream.common.zk.ZkClient;
import com.linkedin.datastream.server.DatastreamSourceClusterResolver;
import com.linkedin.datastream.server.DummyDatastreamSourceClusterResolver;
import com.linkedin.datastream.server.api.strategy.AssignmentStrategy;
import com.linkedin.datastream.server.providers.NoOpPartitionThroughputProvider;
import com.linkedin.datastream.server.providers.PartitionThroughputProvider;
@@ -40,9 +38,8 @@ public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties
}

PartitionThroughputProvider provider = constructPartitionThroughputProvider();
DatastreamSourceClusterResolver clusterResolver = constructDatastreamSourceClusterResolver();

return new LoadBasedPartitionAssignmentStrategy(provider, clusterResolver, _config.getMaxTasks(),
return new LoadBasedPartitionAssignmentStrategy(provider, _config.getMaxTasks(),
_config.getImbalanceThreshold(), _config.getMaxPartitions(), enableElasticTaskAssignment,
_config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), _config.getTaskCapacityMBps(),
_config.getTaskCapacityUtilizationPct(), zkClient, _config.getCluster());
@@ -51,8 +48,4 @@ public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties
protected PartitionThroughputProvider constructPartitionThroughputProvider() {
return new NoOpPartitionThroughputProvider();
}

protected DatastreamSourceClusterResolver constructDatastreamSourceClusterResolver() {
return new DummyDatastreamSourceClusterResolver();
}
}
Original file line number Diff line number Diff line change
@@ -12,12 +12,14 @@
import java.util.HashMap;
import java.util.Iterator;

import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

import com.linkedin.datastream.server.ClusterThroughputInfo;
import com.linkedin.datastream.server.DatastreamGroup;
import com.linkedin.datastream.server.PartitionThroughputInfo;


@@ -49,6 +51,11 @@ public ClusterThroughputInfo getThroughputInfo(String clusterName) {
return readThroughputInfoFromFile(partitionThroughputFile, clusterName);
}

@Override
public ClusterThroughputInfo getThroughputInfo(DatastreamGroup datastreamGroup) {
throw new NotImplementedException();
}

/**
* {@inheritDoc}
*/
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
import java.util.HashMap;

import com.linkedin.datastream.server.ClusterThroughputInfo;
import com.linkedin.datastream.server.DatastreamGroup;


/**
@@ -19,6 +20,11 @@ public ClusterThroughputInfo getThroughputInfo(String clusterName) {
return null;
}

@Override
public ClusterThroughputInfo getThroughputInfo(DatastreamGroup datastreamGroup) {
return null;
}

@Override
public HashMap<String, ClusterThroughputInfo> getThroughputInfo() {
return null;