Skip to content

Commit

Permalink
Make Default byte in rate and Msg in rate configurable (linkedin#864)
Browse files Browse the repository at this point in the history
The default values set may need to be changed based on the specific scenario. Specifically, this brings a dependency that if the default throughput rate per task in 5MB and the default is 5KB, it implies that a task can only process 1000 partitions. Making this a configurable property will make it customizable based on the requirement.

Note: This change does not cover config at a datastream level. We can add it based on the requirement.
  • Loading branch information
vmaheshw authored and Vaibhav Maheshwari committed Mar 1, 2022
1 parent 32689df commit 1189a54
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ public class LoadBasedPartitionAssigner implements MetricsAware {

private final Map<String, LoadBasedPartitionAssigner.PartitionAssignmentStats> _partitionAssignmentStatsMap =
new ConcurrentHashMap<>();
private final int _defaultPartitionBytesInKBRate;
private final int _defaultPartitionMsgsInRate;

/**
* Constructor of LoadBasedPartitionAssigner
* @param defaultPartitionBytesInKBRate default bytesIn rate in KB for partition
* @param defaultPartitionMsgsInRate default msgsIn rate in KB for partition
*/
public LoadBasedPartitionAssigner(int defaultPartitionBytesInKBRate, int defaultPartitionMsgsInRate) {
_defaultPartitionBytesInKBRate = defaultPartitionBytesInKBRate;
_defaultPartitionMsgsInRate = defaultPartitionMsgsInRate;
}

/**
* Performs partition assignment based on partition throughput information.
* <p>
Expand Down Expand Up @@ -90,9 +103,9 @@ public Map<String, Set<DatastreamTask>> assignPartitions(

// sort the current assignment's tasks on total throughput
Map<String, Integer> taskThroughputMap = new HashMap<>();
PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(
LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_BYTES_IN_KB_RATE,
LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_MESSAGES_IN_RATE, "");
PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(_defaultPartitionBytesInKBRate,
_defaultPartitionMsgsInRate, "");

newPartitions.forEach((task, partitions) -> {
int totalThroughput = partitions.stream()
.mapToInt(p -> partitionInfoMap.getOrDefault(p, defaultPartitionInfo).getBytesInKBRate())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignm
private final boolean _enableThroughputBasedPartitionAssignment;
private final boolean _enablePartitionNumBasedTaskCountEstimation;
private final LoadBasedPartitionAssigner _assigner;
private final int _defaultPartitionBytesInKBRate;
private final int _defaultPartitionMsgsInRate;

/**
* Creates an instance of {@link LoadBasedPartitionAssignmentStrategy}
Expand All @@ -63,7 +65,8 @@ public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughp
int imbalanceThreshold, int maxPartitionPerTask, boolean enableElasticTaskAssignment, int partitionsPerTask,
int partitionFullnessFactorPct, int taskCapacityMBps, int taskCapacityUtilizationPct,
int throughputInfoFetchTimeoutMs, int throughputInfoFetchRetryPeriodMs, ZkClient zkClient, String clusterName,
boolean enableThroughputBasedPartitionAssignment, boolean enablePartitionNumBasedTaskCountEstimation) {
boolean enableThroughputBasedPartitionAssignment, boolean enablePartitionNumBasedTaskCountEstimation,
int defaultPartitionBytesInKBRate, int defaultPartitionMsgsInRate) {
super(maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
partitionFullnessFactorPct, zkClient, clusterName);
_throughputProvider = throughputProvider;
Expand All @@ -73,13 +76,15 @@ public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughp
_throughputInfoFetchRetryPeriodMs = throughputInfoFetchRetryPeriodMs;
_enableThroughputBasedPartitionAssignment = enableThroughputBasedPartitionAssignment;
_enablePartitionNumBasedTaskCountEstimation = enablePartitionNumBasedTaskCountEstimation;
_defaultPartitionBytesInKBRate = defaultPartitionBytesInKBRate;
_defaultPartitionMsgsInRate = defaultPartitionMsgsInRate;

LOG.info("Task capacity : {}MBps, task capacity utilization : {}%, Throughput info fetch timeout : {} ms, "
+ "throughput info fetch retry period : {} ms, throughput based partition assignment : {}, "
+ "partition num based task count estimation : {}", _taskCapacityMBps, _taskCapacityUtilizationPct,
_throughputInfoFetchTimeoutMs, _throughputInfoFetchRetryPeriodMs, _enableThroughputBasedPartitionAssignment ?
"enabled" : "disabled", _enablePartitionNumBasedTaskCountEstimation ? "enabled" : "disabled");
_assigner = new LoadBasedPartitionAssigner();
_assigner = new LoadBasedPartitionAssigner(defaultPartitionBytesInKBRate, defaultPartitionMsgsInRate);
}

/**
Expand Down Expand Up @@ -138,7 +143,8 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<Datastr
numTasksNeeded = numTasksEstimateBasedOnPartitionCount;
}

LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(_taskCapacityMBps, _taskCapacityUtilizationPct);
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(_taskCapacityMBps, _taskCapacityUtilizationPct,
_defaultPartitionBytesInKBRate, _defaultPartitionMsgsInRate);
int numTasksEstimateBasedOnLoad = estimator.getTaskCount(clusterThroughputInfo, assignedPartitions,
unassignedPartitions, datastreamGroupName);
numTasksNeeded = Math.max(numTasksNeeded, numTasksEstimateBasedOnLoad);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,23 @@
*/
public class LoadBasedPartitionAssignmentStrategyConfig extends PartitionAssignmentStrategyConfig {

public static final int DEFAULT_PARTITION_BYTES_IN_KB_RATE = 5;
public static final int DEFAULT_PARTITION_MESSAGES_IN_RATE = 5;

public static final String CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS = "throughputInfoFetchTimeoutMs";
public static final String CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS = "throughputInfoFetchRetryPeriodMs";
public static final String CFG_TASK_CAPACITY_MBPS = "taskCapacityMBps";
public static final String CFG_TASK_CAPACITY_UTILIZATION_PCT = "taskCapacityUtilizationPct";
public static final String CFG_ENABLE_THROUGHPUT_BASED_PARTITION_ASSIGNMENT = "enableThroughputBasedPartitionAssignment";
public static final String CFG_ENABLE_PARTITION_NUM_BASED_TASK_COUNT_ESTIMATION = "enablePartitionNumBasedTaskCountEstimation";
public static final String CFG_DEFAULT_PARTITION_BYTES_IN_KB_RATE = "defaultPartitionBytesInKBRate";
public static final String CFG_DEFAULT_PARTITION_MSGS_IN_RATE = "defaultPartitionMsgsInRate";

private static final int DEFAULT_THROUGHPUT_INFO_FETCH_TIMEOUT_MS = (int) Duration.ofSeconds(10).toMillis();
private static final int DEFAULT_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS = (int) Duration.ofSeconds(1).toMillis();
private static final int DEFAULT_TASK_CAPACITY_MBPS = 4;
private static final int DEFAULT_TASK_CAPACITY_UTILIZATION_PCT = 90;
private static final boolean DEFAULT_ENABLE_THROUGHPUT_BASED_PARTITION_ASSIGNMENT = false;
private static final boolean DEFAULT_ENABLE_PARTITION_NUM_BASED_TASK_COUNT_ESTIMATION = false;
private static final int DEFAULT_PARTITION_BYTES_IN_KB_RATE = 5;
private static final int DEFAULT_PARTITION_MSGS_IN_RATE = 5;


private final int _taskCapacityMBps;
Expand All @@ -39,6 +40,9 @@ public class LoadBasedPartitionAssignmentStrategyConfig extends PartitionAssignm
private final int _throughputInfoFetchRetryPeriodMs;
private final boolean _enableThroughputBasedPartitionAssignment;
private final boolean _enablePartitionNumBasedTaskCountEstimation;
private final int _defaultPartitionBytesInKBRate;
private final int _defaultPartitionMsgsInRate;

/**
* Creates an instance of {@link LoadBasedPartitionAssignmentStrategyConfig}
* @param config Config properties
Expand All @@ -54,7 +58,8 @@ public LoadBasedPartitionAssignmentStrategyConfig(Properties config) {
DEFAULT_ENABLE_THROUGHPUT_BASED_PARTITION_ASSIGNMENT);
_enablePartitionNumBasedTaskCountEstimation = props.getBoolean(CFG_ENABLE_PARTITION_NUM_BASED_TASK_COUNT_ESTIMATION,
DEFAULT_ENABLE_PARTITION_NUM_BASED_TASK_COUNT_ESTIMATION);

_defaultPartitionBytesInKBRate = props.getInt(CFG_DEFAULT_PARTITION_BYTES_IN_KB_RATE, DEFAULT_PARTITION_BYTES_IN_KB_RATE);
_defaultPartitionMsgsInRate = props.getInt(CFG_DEFAULT_PARTITION_MSGS_IN_RATE, DEFAULT_PARTITION_MSGS_IN_RATE);
}

/**
Expand Down Expand Up @@ -104,4 +109,12 @@ public boolean isEnableThroughputBasedPartitionAssignment() {
public boolean isEnablePartitionNumBasedTaskCountEstimation() {
return _enablePartitionNumBasedTaskCountEstimation;
}

public int getDefaultPartitionBytesInKBRate() {
return _defaultPartitionBytesInKBRate;
}

public int getDefaultPartitionMsgsInRate() {
return _defaultPartitionMsgsInRate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties

PartitionThroughputProvider provider = constructPartitionThroughputProvider();

//TODO: Directly pass the config object.
return new LoadBasedPartitionAssignmentStrategy(provider, _config.getMaxTasks(),
_config.getImbalanceThreshold(), _config.getMaxPartitions(), enableElasticTaskAssignment,
_config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), config.getTaskCapacityMBps(),
config.getTaskCapacityUtilizationPct(), config.getThroughputInfoFetchTimeoutMs(),
config.getThroughputInfoFetchRetryPeriodMs(), zkClient, _config.getCluster(),
config.isEnableThroughputBasedPartitionAssignment(), config.isEnablePartitionNumBasedTaskCountEstimation());
config.isEnableThroughputBasedPartitionAssignment(), config.isEnablePartitionNumBasedTaskCountEstimation(),
config.getDefaultPartitionBytesInKBRate(), config.getDefaultPartitionMsgsInRate());
}

protected PartitionThroughputProvider constructPartitionThroughputProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,22 @@ public class LoadBasedTaskCountEstimator {
private static final Logger LOG = LoggerFactory.getLogger(LoadBasedTaskCountEstimator.class.getName());
private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;
private final int _defaultPartitionBytesInKBRate;
private final int _defaultPartitionMsgsInRate;

/**
* Creates an instance of {@link LoadBasedTaskCountEstimator}
* @param taskCapacityMBps Task capacity in MB/sec
* @param taskCapacityUtilizationPct Task capacity utilization percentage
* @param defaultPartitionBytesInKBRate Default bytesIn rate in KB for partition
* @param defaultPartitionMsgsInRate Default msgsIn rate for partition
*/
public LoadBasedTaskCountEstimator(int taskCapacityMBps, int taskCapacityUtilizationPct) {
public LoadBasedTaskCountEstimator(int taskCapacityMBps, int taskCapacityUtilizationPct,
int defaultPartitionBytesInKBRate, int defaultPartitionMsgsInRate) {
_taskCapacityMBps = taskCapacityMBps;
_taskCapacityUtilizationPct = taskCapacityUtilizationPct;
_defaultPartitionBytesInKBRate = defaultPartitionBytesInKBRate;
_defaultPartitionMsgsInRate = defaultPartitionMsgsInRate;
}

/**
Expand All @@ -61,9 +68,9 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List<String> assig
Set<String> allPartitions = new HashSet<>(assignedPartitions);
allPartitions.addAll(unassignedPartitions);

PartitionThroughputInfo defaultThroughputInfo = new PartitionThroughputInfo(
LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_BYTES_IN_KB_RATE,
LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_MESSAGES_IN_RATE, "");
PartitionThroughputInfo defaultThroughputInfo = new PartitionThroughputInfo(_defaultPartitionBytesInKBRate,
_defaultPartitionMsgsInRate, "");

// total throughput in KB/sec
int totalThroughput = allPartitions.stream()
.map(p -> throughputMap.getOrDefault(p, defaultThroughputInfo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class TestLoadBasedTaskCountEstimator {
private static final String THROUGHPUT_FILE_NAME = "partitionThroughput.json";
private static final int TASK_CAPACITY_MBPS = 4;
private static final int TASK_CAPACITY_UTILIZATION_PCT = 90;
private static final int DEFAULT_BYTES_IN_KB_RATE = 5;
private static final int DEFAULT_MSGS_IN_RATE = 5;

private FileBasedPartitionThroughputProvider _provider;

Expand All @@ -43,7 +45,7 @@ public void emptyAssignmentReturnsZeroTasksTest() {
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = Collections.emptyList();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, 0);
}
Expand All @@ -55,7 +57,7 @@ public void lowThroughputAssignmentReturnsOneTaskTest() {
assignedPartitions.add("Pepperoni-1");
List<String> unassignedPartitions = Collections.emptyList();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, 1);
}
Expand All @@ -66,7 +68,7 @@ public void highThroughputAssignmentTest() {
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet());
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");

int throughputSum = throughputInfo.getPartitionInfoMap().values().stream().mapToInt(
Expand All @@ -82,7 +84,7 @@ public void highThroughputAssignmentTest2() {
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet());
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, unassignedPartitions.size());
}
Expand All @@ -93,7 +95,7 @@ public void partitionsHaveDefaultWeightTest() {
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = Arrays.asList("P1", "P2");
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertTrue(taskCount > 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void assignFromScratchTest() {
DatastreamGroupPartitionsMetadata metadata = new DatastreamGroupPartitionsMetadata(new DatastreamGroup(
Collections.singletonList(ds1)), Arrays.asList("P1", "P2", "P3"));

LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner();
LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner(5, 10);
Map<String, Set<DatastreamTask>> newAssignment =
assigner.assignPartitions(throughputInfo, currentAssignment, unassignedPartitions, metadata, Integer.MAX_VALUE);

Expand Down Expand Up @@ -120,9 +120,9 @@ public void newAssignmentRetainsTasksFromOtherDatastreamsTest() {
currentAssignment.put("instance2", new HashSet<>(Collections.singletonList(datastream1Task2)));

DatastreamGroupPartitionsMetadata metadata = new DatastreamGroupPartitionsMetadata(new DatastreamGroup(
Collections.singletonList(ds2)), Arrays.asList("P3"));
Collections.singletonList(ds2)), Collections.singletonList("P3"));

LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner();
LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner(5, 10);
Map<String, Set<DatastreamTask>> newAssignment = assigner.assignPartitions(throughputInfo, currentAssignment,
unassignedPartitions, metadata, Integer.MAX_VALUE);

Expand Down Expand Up @@ -166,7 +166,7 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() {
DatastreamGroupPartitionsMetadata metadata = new DatastreamGroupPartitionsMetadata(new DatastreamGroup(
Collections.singletonList(ds1)), Arrays.asList("P1", "P2", "P3", "P4"));

LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner();
LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner(5, 10);
Map<String, Set<DatastreamTask>> newAssignment = assigner.assignPartitions(throughputInfo, currentAssignment,
unassignedPartitions, metadata, Integer.MAX_VALUE);

Expand All @@ -187,7 +187,7 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() {

@Test
public void lightestTaskGetsNewPartitionTest() {
List<String> unassignedPartitions = Arrays.asList("P4");
List<String> unassignedPartitions = Collections.singletonList("P4");
Map<String, PartitionThroughputInfo> throughputInfoMap = new HashMap<>();
throughputInfoMap.put("P1", new PartitionThroughputInfo(5, 5, "P1"));
throughputInfoMap.put("P2", new PartitionThroughputInfo(5, 5, "P2"));
Expand All @@ -207,7 +207,7 @@ public void lightestTaskGetsNewPartitionTest() {
DatastreamGroupPartitionsMetadata metadata = new DatastreamGroupPartitionsMetadata(new DatastreamGroup(
Collections.singletonList(ds1)), Arrays.asList("P1", "P2", "P3", "P4"));

LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner();
LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner(5, 10);
Map<String, Set<DatastreamTask>> newAssignment = assigner.assignPartitions(throughputInfo, currentAssignment,
unassignedPartitions, metadata, Integer.MAX_VALUE);

Expand Down Expand Up @@ -236,15 +236,15 @@ public void throwsExceptionWhenNotEnoughRoomForAllPartitionsTest() {
DatastreamGroupPartitionsMetadata metadata = new DatastreamGroupPartitionsMetadata(new DatastreamGroup(
Collections.singletonList(ds1)), Arrays.asList("P1", "P2", "P3", "P4", "P5"));

LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner();
LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner(5, 10);
int maxPartitionsPerTask = 2;
Assert.assertThrows(DatastreamRuntimeException.class, () -> assigner.assignPartitions(throughputInfo,
currentAssignment, unassignedPartitions, metadata, maxPartitionsPerTask));
}

@Test
public void taskWithRoomGetsNewPartitionTest() {
List<String> unassignedPartitions = Arrays.asList("P4");
List<String> unassignedPartitions = Collections.singletonList("P4");
Map<String, PartitionThroughputInfo> throughputInfoMap = new HashMap<>();
throughputInfoMap.put("P1", new PartitionThroughputInfo(5, 5, "P1"));
throughputInfoMap.put("P2", new PartitionThroughputInfo(5, 5, "P2"));
Expand All @@ -264,7 +264,7 @@ public void taskWithRoomGetsNewPartitionTest() {
DatastreamGroupPartitionsMetadata metadata = new DatastreamGroupPartitionsMetadata(new DatastreamGroup(
Collections.singletonList(ds1)), Arrays.asList("P1", "P2", "P3", "P4"));

LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner();
LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner(5, 10);
int maxPartitionsPerTask = 2;
Map<String, Set<DatastreamTask>> newAssignment = assigner.assignPartitions(throughputInfo, currentAssignment,
unassignedPartitions, metadata, maxPartitionsPerTask);
Expand All @@ -278,7 +278,7 @@ public void taskWithRoomGetsNewPartitionTest() {

@Test
public void findTaskWithRoomForAPartitionTests() {
LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner();
LoadBasedPartitionAssigner assigner = new LoadBasedPartitionAssigner(5, 10);
List<String> tasks = Arrays.asList("T1", "T2");
Map<String, Set<String>> partitionsMap = new HashMap<>();
partitionsMap.put("T1", new HashSet<>(Collections.emptySet()));
Expand Down
Loading

0 comments on commit 1189a54

Please sign in to comment.