diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java index f22714db545e5..532976af3099d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java @@ -28,13 +28,18 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -/** Base class for resources one can specify. */ +/** + * Base class for resources one can specify. Notice that the scale of value will be limited to + * {@link #MAX_VALUE_SCALE} and all the trailing zeros will be stripped for readability. + */ @Internal public abstract class Resource> implements Serializable, Comparable { private static final long serialVersionUID = 1L; + private static final int MAX_VALUE_SCALE = 8; + private final String name; private final BigDecimal value; @@ -45,11 +50,14 @@ protected Resource(String name, double value) { protected Resource(String name, BigDecimal value) { checkNotNull(value); + final BigDecimal valueRoundDown = + value.setScale(MAX_VALUE_SCALE, RoundingMode.DOWN).stripTrailingZeros(); checkArgument( - value.compareTo(BigDecimal.ZERO) >= 0, "Resource value must be no less than 0"); + valueRoundDown.compareTo(BigDecimal.ZERO) >= 0, + "Resource value must be no less than 0"); this.name = checkNotNull(name); - this.value = value; + this.value = valueRoundDown; } public T merge(T other) { @@ -80,7 +88,7 @@ public T multiply(int multiplier) { } public T divide(BigDecimal by) { - return create(value.divide(by, 16, RoundingMode.DOWN)); + return create(value.divide(by, MAX_VALUE_SCALE, RoundingMode.DOWN)); } public T divide(int by) { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/resources/ResourceTest.java b/flink-core/src/test/java/org/apache/flink/api/common/resources/ResourceTest.java index c0b2fcd95b17e..3f5bcccc036ec 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/resources/ResourceTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/resources/ResourceTest.java @@ -205,6 +205,22 @@ public void testCompareToFailDifferentName() { resource1.compareTo(resource2); } + /** This test assume that the scale limitation is 8. */ + @Test + public void testValueScaleLimited() { + final Resource v1 = new TestResource(0.100000001); + assertTestResourceValueEquals(0.1, v1); + + final Resource v2 = new TestResource(1.0).divide(3); + assertTestResourceValueEquals(0.33333333, v2); + } + + @Test + public void testStripTrailingZeros() { + final Resource v = new TestResource(0.25).multiply(2); + assertThat(v.getValue().toString(), is("0.5")); + } + private static void assertTestResourceValueEquals(final double value, final Resource resource) { assertEquals(new TestResource(value), resource); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java index 8868e869c43b8..4a15ec5114458 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java @@ -362,7 +362,12 @@ protected void validateRequestedResources( .getMebiBytes()))); assertThat( resourceRequirements.getRequests().get(Constants.RESOURCE_NAME_CPU).getAmount(), - is(String.valueOf(taskExecutorProcessSpec.getCpuCores().getValue()))); + is( + String.valueOf( + taskExecutorProcessSpec + .getCpuCores() + .getValue() + .doubleValue()))); assertThat( resourceRequirements @@ -376,7 +381,12 @@ protected void validateRequestedResources( .getMebiBytes()))); assertThat( resourceRequirements.getLimits().get(Constants.RESOURCE_NAME_CPU).getAmount(), - is(String.valueOf(taskExecutorProcessSpec.getCpuCores().getValue()))); + is( + String.valueOf( + taskExecutorProcessSpec + .getCpuCores() + .getValue() + .doubleValue()))); } @Override