From 1687244ff0e9d12dd0d864136f9b39623f0446bc Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 18 Jul 2023 11:43:51 +0800 Subject: [PATCH] [improve][offload] Extend the offload policies to allow specifying more conf (#20804) ### Motivation The offload policies have limited the configurations for the offloaders. That means if the offloader needs more configurations, we need to extend more fields in the OffloadPoliciesImpl. That doesn't make sense. We should make it extendable easily. Add a configuration map support to allow it to set more configurations. --- .../policies/data/OffloadPoliciesImpl.java | 14 ++++++ .../pulsar/common/util/FieldParser.java | 9 ++++ .../policies/data/OffloadPoliciesTest.java | 13 ++++++ .../pulsar/common/util/FieldParserTest.java | 45 +++++++++++++++++++ 4 files changed, 81 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 843c1bde3b912..f9148ba8699fd 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; import lombok.Data; @@ -118,6 +119,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY; + @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) + private Map managedLedgerExtraConfigurations = null; // s3 config, set by service configuration or cli @Configuration @@ -257,6 +261,14 @@ public static OffloadPoliciesImpl create(Properties properties) { } } }); + Map extraConfigurations = properties.entrySet().stream() + .filter(entry -> entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig")) + .collect(Collectors.toMap( + entry -> entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""), + entry -> entry.getValue().toString())); + + data.setManagedLedgerExtraConfigurations(extraConfigurations); + data.compatibleWithBrokerConfigFile(properties); return data; } @@ -347,6 +359,8 @@ public Properties toProperties() { this.getManagedLedgerOffloadThresholdInSeconds()); setProperty(properties, "managedLedgerOffloadDeletionLagInMillis", this.getManagedLedgerOffloadDeletionLagInMillis()); + setProperty(properties, "managedLedgerOffloadExtraConfigurations", + this.getManagedLedgerExtraConfigurations()); if (this.isS3Driver()) { setProperty(properties, "s3ManagedLedgerOffloadRegion", diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java index 626a14b92eedd..8d1ae5294ff7b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java @@ -314,6 +314,9 @@ public static Float stringToFloat(String val) { * @return The converted list with type {@code }. */ public static List stringToList(String val, Class type) { + if (val == null) { + return null; + } String[] tokens = trim(val).split(","); return Arrays.stream(tokens).map(t -> { return convert(trim(t), type); @@ -330,6 +333,9 @@ public static List stringToList(String val, Class type) { * @return The converted set with type {@code }. */ public static Set stringToSet(String val, Class type) { + if (val == null) { + return null; + } String[] tokens = trim(val).split(","); return Arrays.stream(tokens).map(t -> { return convert(trim(t), type); @@ -337,6 +343,9 @@ public static Set stringToSet(String val, Class type) { } private static Map stringToMap(String strValue, Class keyType, Class valueType) { + if (strValue == null) { + return null; + } String[] tokens = trim(strValue).split(","); Map map = new HashMap<>(); for (String token : tokens) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java index 00b9aab0b1591..d79d2c32ffa7f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java @@ -26,6 +26,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Map; import java.util.Properties; import org.testng.Assert; import org.testng.annotations.Test; @@ -432,4 +433,16 @@ private byte[] loadClassData(String name) throws IOException { } } + @Test + public void testCreateOffloadPoliciesWithExtraConfiguration() { + Properties properties = new Properties(); + properties.put("managedLedgerOffloadExtraConfigKey1", "value1"); + properties.put("managedLedgerOffloadExtraConfigKey2", "value2"); + OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties); + + Map extraConfigurations = policies.getManagedLedgerExtraConfigurations(); + Assert.assertEquals(extraConfigurations.size(), 2); + Assert.assertEquals(extraConfigurations.get("Key1"), "value1"); + Assert.assertEquals(extraConfigurations.get("Key2"), "value2"); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java index e90b6cbc4a13a..b24e9ae40822a 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java @@ -19,12 +19,15 @@ package org.apache.pulsar.common.util; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.testng.annotations.Test; @@ -94,4 +97,46 @@ public static class MyConfig { public Set stringSet; } + @Test + public void testNullStrValue() throws Exception { + class TestMap { + public List list; + public Set set; + public Map map; + public Optional optional; + } + + Field listField = TestMap.class.getField("list"); + Object listValue = FieldParser.value(null, listField); + assertNull(listValue); + + listValue = FieldParser.value("null", listField); + assertTrue(listValue instanceof List); + assertEquals(((List) listValue).size(), 1); + assertEquals(((List) listValue).get(0), "null"); + + + Field setField = TestMap.class.getField("set"); + Object setValue = FieldParser.value(null, setField); + assertNull(setValue); + + setValue = FieldParser.value("null", setField); + assertTrue(setValue instanceof Set); + assertEquals(((Set) setValue).size(), 1); + assertEquals(((Set) setValue).iterator().next(), "null"); + + Field mapField = TestMap.class.getField("map"); + Object mapValue = FieldParser.value(null, mapField); + assertNull(mapValue); + + try { + FieldParser.value("null", mapField); + } catch (IllegalArgumentException iae) { + assertTrue(iae.getMessage().contains("null map-value is not in correct format key1=value,key2=value2")); + } + + Field optionalField = TestMap.class.getField("optional"); + Object optionalValue = FieldParser.value(null, optionalField); + assertEquals(optionalValue, Optional.empty()); + } }