From a7710f3741b045a42145f31604a6aa1d038864c4 Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Wed, 3 Jan 2024 15:07:23 -0800 Subject: [PATCH 1/8] add fleet level decider logic --- .../com/pinterest/singer/config/Decider.java | 6 ++++++ .../processor/DefaultLogStreamProcessor.java | 11 +++++++++++ .../com/pinterest/singer/utils/SingerUtils.java | 17 +++++++++++++++++ 3 files changed, 34 insertions(+) diff --git a/singer/src/main/java/com/pinterest/singer/config/Decider.java b/singer/src/main/java/com/pinterest/singer/config/Decider.java index 5a07e445..c5d56063 100644 --- a/singer/src/main/java/com/pinterest/singer/config/Decider.java +++ b/singer/src/main/java/com/pinterest/singer/config/Decider.java @@ -16,6 +16,7 @@ package com.pinterest.singer.config; import com.pinterest.singer.utils.HashUtils; +import com.pinterest.singer.utils.SingerUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -103,6 +104,11 @@ public Map getDeciderMap() { return mDeciderMap; } + public static String generateDisableDecider(String logName) { + return "singer_disable_" + logName + "___" + + SingerUtils.getHostnamePrefix().replace('-', '_') + "___decider"; + } + /** * Looks up the value of the decider variable named {@code deciderName} and flips a coin to * determine if we should be in the experiment based on the specified ID. Useful if a stable diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java index c2aada34..efb44ee5 100644 --- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java @@ -68,6 +68,9 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable { // Decider for the log stream. private final String logDecider; + // Decider used in conjunction with logDecider to disable the logstream at a fleet level + private final String fleetDisableDecider; + // LogStream to be processed. protected final LogStream logStream; @@ -165,6 +168,9 @@ public DefaultLogStreamProcessor( this.exceedTimeSliceLimit = false; this.lastModificationTimeProcessed = new AtomicLong(-1); this.lastCompletedCycleTime = new AtomicLong(-1); + this.fleetDisableDecider = + Decider.generateDisableDecider(logStream.getSingerLog().getSingerLogConfig().getName() + .replaceAll("[^a-zA-Z0-9]", "_")); } @Override @@ -252,6 +258,11 @@ boolean isLoggingAllowedByDecider() { if (map.containsKey(logDecider)) { result = map.get(logDecider) != 0; } + if (result && map.containsKey(fleetDisableDecider) && map.get(fleetDisableDecider) == 100) { + LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100", logStream.getLogStreamName(), + fleetDisableDecider); + result = false; + } } return result; } diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java index 9a481a2f..f6bd07df 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java @@ -91,6 +91,23 @@ public static String getHostname() { return hostName; } + /** + * @return the prefix of the host name + */ + public static String getHostnamePrefix() { + String hostNameStringPrefix = "null"; + if (HOSTNAME != null) { + String hostNameToSplit = ""; + String[] substrs = HOSTNAME.split("\\d+"); + if (substrs.length > 0) { + hostNameToSplit = substrs[0]; + } + int dashIndex = hostNameToSplit.lastIndexOf('-'); + hostNameStringPrefix = dashIndex == -1 ? hostNameToSplit : hostNameToSplit.substring(0, dashIndex); + } + return hostNameStringPrefix; + } + public static Path getPath(String filePathStr) { return defaultFileSystem.getPath(filePathStr); } From 79fafc999742b4e321a5fbdefe83ca5b57015d0d Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Fri, 12 Jan 2024 16:56:53 -0800 Subject: [PATCH 2/8] change logic to avoid computing hostname prefix --- .../com/pinterest/singer/config/Decider.java | 33 +++++++++++++-- .../processor/DefaultLogStreamProcessor.java | 16 +++---- .../pinterest/singer/utils/SingerUtils.java | 22 +++------- .../DefaultLogStreamProcessorTest.java | 42 +++++++++++++++++++ 4 files changed, 85 insertions(+), 28 deletions(-) diff --git a/singer/src/main/java/com/pinterest/singer/config/Decider.java b/singer/src/main/java/com/pinterest/singer/config/Decider.java index c5d56063..7ae633f6 100644 --- a/singer/src/main/java/com/pinterest/singer/config/Decider.java +++ b/singer/src/main/java/com/pinterest/singer/config/Decider.java @@ -15,6 +15,7 @@ */ package com.pinterest.singer.config; +import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.utils.HashUtils; import com.pinterest.singer.utils.SingerUtils; @@ -31,8 +32,12 @@ import java.io.IOException; import java.math.BigInteger; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; /** * Basic Decider Framework. @@ -104,9 +109,31 @@ public Map getDeciderMap() { return mDeciderMap; } - public static String generateDisableDecider(String logName) { - return "singer_disable_" + logName + "___" - + SingerUtils.getHostnamePrefix().replace('-', '_') + "___decider"; + /*** + * Given a log name, return the decider name that is used to disable the log. The disable decider + * name is required to be in the format of "singer_disable_logName___HOSTNAMEPREFIX___decider". + * Additionally, if there are multiple deciders that match this format , the one with the largest + * character count will be returned. + * + * @param logName + * @return the disable decider name if it exists, null otherwise + */ + public String getDisableDecider(String logName) { + LOG.warn("Calling get disable decider"); + Set disableDeciderList = new HashSet<>(); + String deciderName = null; + for (String key : mDeciderMap.keySet()) { + if (key.startsWith("singer_disable_" + logName + "___")) { + disableDeciderList.add(key); + } + } + for (String decider : disableDeciderList) { + if (SingerUtils.HOSTNAME.startsWith(decider.split("___")[1])) { + deciderName = + deciderName != null && deciderName.length() > decider.length() ? deciderName : decider; + } + } + return deciderName; } /** diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java index efb44ee5..79207ff9 100644 --- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java @@ -69,7 +69,7 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable { private final String logDecider; // Decider used in conjunction with logDecider to disable the logstream at a fleet level - private final String fleetDisableDecider; + private final String logstreamNameForDecider; // LogStream to be processed. protected final LogStream logStream; @@ -168,9 +168,8 @@ public DefaultLogStreamProcessor( this.exceedTimeSliceLimit = false; this.lastModificationTimeProcessed = new AtomicLong(-1); this.lastCompletedCycleTime = new AtomicLong(-1); - this.fleetDisableDecider = - Decider.generateDisableDecider(logStream.getSingerLog().getSingerLogConfig().getName() - .replaceAll("[^a-zA-Z0-9]", "_")); + this.logstreamNameForDecider = + logStream.getSingerLog().getSingerLogConfig().getName().replaceAll("[^a-zA-Z0-9]", "_"); } @Override @@ -247,7 +246,8 @@ public long processLogStream() throws LogStreamProcessorException { /** * If the decider is not set, this method will return true. - * If a decider is set, only return false when the decider's value is 0. + * If a decider is set, only return false when the decider's value is 0 and disable decider's + * (if exists) value is 100. * * @return true or false. */ @@ -258,9 +258,9 @@ boolean isLoggingAllowedByDecider() { if (map.containsKey(logDecider)) { result = map.get(logDecider) != 0; } - if (result && map.containsKey(fleetDisableDecider) && map.get(fleetDisableDecider) == 100) { - LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100", logStream.getLogStreamName(), - fleetDisableDecider); + String disableDecider = Decider.getInstance().getDisableDecider(logstreamNameForDecider); + if (result && disableDecider != null && map.get(disableDecider) == 100) { + LOG.info("Disabling log stream {} because fleet disable decider is set to 100", logStream.getLogStreamName()); result = false; } } diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java index f6bd07df..317cacaf 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java @@ -38,6 +38,7 @@ import java.util.Comparator; import java.util.Date; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.comparator.LastModifiedFileComparator; import org.apache.commons.io.comparator.NameFileComparator; @@ -91,23 +92,6 @@ public static String getHostname() { return hostName; } - /** - * @return the prefix of the host name - */ - public static String getHostnamePrefix() { - String hostNameStringPrefix = "null"; - if (HOSTNAME != null) { - String hostNameToSplit = ""; - String[] substrs = HOSTNAME.split("\\d+"); - if (substrs.length > 0) { - hostNameToSplit = substrs[0]; - } - int dashIndex = hostNameToSplit.lastIndexOf('-'); - hostNameStringPrefix = dashIndex == -1 ? hostNameToSplit : hostNameToSplit.substring(0, dashIndex); - } - return hostNameStringPrefix; - } - public static Path getPath(String filePathStr) { return defaultFileSystem.getPath(filePathStr); } @@ -360,5 +344,9 @@ public static void deleteRecursively(File baseDir) { } } } + @VisibleForTesting + public static void setHostname(String hostname) { + HOSTNAME = hostname; + } } diff --git a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java index a2aa2719..2943ff31 100644 --- a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java +++ b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java @@ -37,6 +37,7 @@ import com.pinterest.singer.thrift.configuration.SingerLogConfig; import com.pinterest.singer.thrift.configuration.ThriftReaderConfig; import com.pinterest.singer.utils.SimpleThriftLogger; +import com.pinterest.singer.utils.SingerUtils; import com.pinterest.singer.utils.WatermarkUtils; import com.google.common.collect.ImmutableMap; @@ -47,6 +48,7 @@ import java.io.IOException; import java.io.File; import java.util.Arrays; +import java.util.HashMap; import java.util.List; public class DefaultLogStreamProcessorTest extends com.pinterest.singer.SingerTestBase { @@ -307,6 +309,46 @@ public void testProcessLogStreamWithDecider() throws Exception { } } + @Test + public void testDisableDecider() throws Exception { + DefaultLogStreamProcessor processor = null; + SingerUtils.setHostname("localhost"); + try { + SingerConfig singerConfig = new SingerConfig(); + singerConfig.setThreadPoolSize(1); + singerConfig.setWriterThreadPoolSize(1); + SingerSettings.initialize(singerConfig); + SingerLog singerLog = new SingerLog( + new SingerLogConfig("test", getTempPath(), "thrift.log", null, null, null)); + LogStream logStream = new LogStream(singerLog, "thrift.log"); + NoOpLogStreamWriter writer = new NoOpLogStreamWriter(); + processor = new DefaultLogStreamProcessor( + logStream, + "singer_test_decider", + new DefaultLogStreamReader( + logStream, + new ThriftLogFileReaderFactory(new ThriftReaderConfig(16000, 16000))), + writer, + 50, 1, 1, 3600, 1800); + Decider.setInstance(new HashMap<>()); + Decider.getInstance().getDeciderMap().put("singer_test_decider", 100); + assertEquals(true, processor.isLoggingAllowedByDecider()); + + Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 100); + assertEquals(false, processor.isLoggingAllowedByDecider()); + + Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 50); + assertEquals(true, processor.isLoggingAllowedByDecider()); + + } catch (Exception e) { + e.printStackTrace(); + fail("Unexpected exception"); + } finally { + if (processor != null) { + processor.close(); + } + } + } private static List getMessages(List messageAndPositions) { List messages = Lists.newArrayListWithExpectedSize(messageAndPositions.size()); for (LogMessageAndPosition messageAndPosition : messageAndPositions) { From 4bd8e2f7f2ab1418902ab52530a62af9a4653578 Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Fri, 19 Jan 2024 16:24:02 -0800 Subject: [PATCH 3/8] refactor implementation --- .../com/pinterest/singer/config/Decider.java | 28 ++++++------------- .../processor/DefaultLogStreamProcessor.java | 20 ++++++++----- .../pinterest/singer/utils/SingerUtils.java | 25 ++++++++++++++++- .../DefaultLogStreamProcessorTest.java | 5 ++-- 4 files changed, 49 insertions(+), 29 deletions(-) diff --git a/singer/src/main/java/com/pinterest/singer/config/Decider.java b/singer/src/main/java/com/pinterest/singer/config/Decider.java index fd7d500e..e14ae2f5 100644 --- a/singer/src/main/java/com/pinterest/singer/config/Decider.java +++ b/singer/src/main/java/com/pinterest/singer/config/Decider.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.math.BigInteger; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -110,30 +111,19 @@ public Map getDeciderMap() { } /*** - * Given a log name, return the decider name that is used to disable the log. The disable decider + * Given a log name, return a list of possible decider names used to disable the log. The disable decider * name is required to be in the format of "singer_disable_logName___HOSTNAMEPREFIX___decider". - * Additionally, if there are multiple deciders that match this format , the one with the largest - * character count will be returned. * * @param logName - * @return the disable decider name if it exists, null otherwise + * @return a list of disable deciders */ - public String getDisableDecider(String logName) { - Set disableDeciderList = new HashSet<>(); - String deciderName = null; - for (String key : mDeciderMap.keySet()) { - if (key.startsWith("singer_disable_" + logName + "___")) { - disableDeciderList.add(key); - } - } - String convertedHostname = SingerUtils.HOSTNAME.replace("-", "_"); - for (String decider : disableDeciderList) { - if (convertedHostname.startsWith(decider.split("___")[1])) { - deciderName = - deciderName != null && deciderName.length() > decider.length() ? deciderName : decider; - } + public List generateDisableDeciders(String logName) { + List disableDeciderList = new ArrayList<>(); + for (int i = SingerUtils.HOSTNAME_PREFIXES.size() - 1; i >= 0; i--) { + String convertedHostname = SingerUtils.HOSTNAME_PREFIXES.get(i).replace("-", "_"); + disableDeciderList.add("singer_disable_" + logName.replaceAll("[^a-zA-Z0-9]", "_") + "___" + convertedHostname + "___decider"); } - return deciderName; + return disableDeciderList; } /** diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java index 79207ff9..f3c1adae 100644 --- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java @@ -69,7 +69,7 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable { private final String logDecider; // Decider used in conjunction with logDecider to disable the logstream at a fleet level - private final String logstreamNameForDecider; + private final List disableDeciders; // LogStream to be processed. protected final LogStream logStream; @@ -168,8 +168,9 @@ public DefaultLogStreamProcessor( this.exceedTimeSliceLimit = false; this.lastModificationTimeProcessed = new AtomicLong(-1); this.lastCompletedCycleTime = new AtomicLong(-1); - this.logstreamNameForDecider = - logStream.getSingerLog().getSingerLogConfig().getName().replaceAll("[^a-zA-Z0-9]", "_"); + this.disableDeciders = + Decider.getInstance().generateDisableDeciders( + this.logStream.getSingerLog().getSingerLogConfig().getName()); } @Override @@ -258,10 +259,15 @@ boolean isLoggingAllowedByDecider() { if (map.containsKey(logDecider)) { result = map.get(logDecider) != 0; } - String disableDecider = Decider.getInstance().getDisableDecider(logstreamNameForDecider); - if (result && disableDecider != null && map.get(disableDecider) == 100) { - LOG.info("Disabling log stream {} because fleet disable decider is set to 100", logStream.getLogStreamName()); - result = false; + if (result && disableDeciders != null) { + for (String disableDecider : disableDeciders) { + if (map.containsKey(disableDecider) && map.get(disableDecider) == 100) { + LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100", + logStream.getLogStreamName(), disableDecider); + result = false; + break; + } + } } } return result; diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java index 317cacaf..6d081775 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java @@ -34,9 +34,11 @@ import java.security.NoSuchAlgorithmException; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.Date; +import java.util.List; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.configuration.ConfigurationException; @@ -76,6 +78,7 @@ public class SingerUtils { public static final FileSystem defaultFileSystem = FileSystems.getDefault(); public static String HOSTNAME = getHostname(); + public static List HOSTNAME_PREFIXES = getHostnamePrefixes(); public static String getHostname() { String hostName; @@ -92,6 +95,25 @@ public static String getHostname() { return hostName; } + /*** + * Gradually builds substrings from hostname separated by dashes + * will return hostname if hostname does not contain dashes + * + * @param + * @return a list of hostname prefixes + */ + public static List getHostnamePrefixes() { + List hostPrefixes = new ArrayList<>(); + String [] splitHostname = HOSTNAME.split("-"); + StringBuilder currentPrefix = new StringBuilder(); + for (String prefix : splitHostname) { + currentPrefix.append(prefix); + hostPrefixes.add(currentPrefix.toString()); + currentPrefix.append("-"); + } + return hostPrefixes; + } + public static Path getPath(String filePathStr) { return defaultFileSystem.getPath(filePathStr); } @@ -347,6 +369,7 @@ public static void deleteRecursively(File baseDir) { @VisibleForTesting public static void setHostname(String hostname) { HOSTNAME = hostname; + HOSTNAME_PREFIXES = getHostnamePrefixes(); } - + } diff --git a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java index 2943ff31..d8229020 100644 --- a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java +++ b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java @@ -312,7 +312,7 @@ public void testProcessLogStreamWithDecider() throws Exception { @Test public void testDisableDecider() throws Exception { DefaultLogStreamProcessor processor = null; - SingerUtils.setHostname("localhost"); + SingerUtils.setHostname("localhost-19970722"); try { SingerConfig singerConfig = new SingerConfig(); singerConfig.setThreadPoolSize(1); @@ -338,7 +338,8 @@ public void testDisableDecider() throws Exception { assertEquals(false, processor.isLoggingAllowedByDecider()); Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 50); - assertEquals(true, processor.isLoggingAllowedByDecider()); + Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost_19970722___decider", 100); + assertEquals(false, processor.isLoggingAllowedByDecider()); } catch (Exception e) { e.printStackTrace(); From 9c1c30e67cab17194a274238bdd88d41c6f44063 Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Fri, 19 Jan 2024 16:39:29 -0800 Subject: [PATCH 4/8] reset hostname after test --- .../singer/processor/DefaultLogStreamProcessorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java index d8229020..116550d7 100644 --- a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java +++ b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java @@ -349,6 +349,7 @@ public void testDisableDecider() throws Exception { processor.close(); } } + SingerUtils.setHostname(SingerUtils.getHostname()); } private static List getMessages(List messageAndPositions) { List messages = Lists.newArrayListWithExpectedSize(messageAndPositions.size()); From 2c3dc017d44e1886862d1821697005f6954f985a Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Tue, 23 Jan 2024 10:54:59 -0800 Subject: [PATCH 5/8] add configurable hostname regex and add metric --- singer-commons/src/main/thrift/config.thrift | 5 +++++ .../processor/DefaultLogStreamProcessor.java | 8 +++++--- .../com/pinterest/singer/utils/SingerUtils.java | 15 ++++++++------- .../processor/DefaultLogStreamProcessorTest.java | 6 +++--- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index 42d0bfb1..6dc5f7e0 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -422,4 +422,9 @@ struct SingerConfig { */ 27: optional string fsEventQueueImplementation; + /** + * Hostname Prefix regex pattern + */ + 28: optional string hostnamePrefixRegex = "-"; + } diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java index f3c1adae..bcef29ec 100644 --- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java @@ -68,7 +68,7 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable { // Decider for the log stream. private final String logDecider; - // Decider used in conjunction with logDecider to disable the logstream at a fleet level + // Valid deciders tha can be used in conjunction with logDecider to disable the logstream at a fleet level private final List disableDeciders; // LogStream to be processed. @@ -247,8 +247,8 @@ public long processLogStream() throws LogStreamProcessorException { /** * If the decider is not set, this method will return true. - * If a decider is set, only return false when the decider's value is 0 and disable decider's - * (if exists) value is 100. + * If a decider is set, return false when the decider's value is 0 or when the + * decider's value is != 0 and disable decider's (if exists) value is 100. * * @return true or false. */ @@ -264,6 +264,8 @@ boolean isLoggingAllowedByDecider() { if (map.containsKey(disableDecider) && map.get(disableDecider) == 100) { LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100", logStream.getLogStreamName(), disableDecider); + OpenTsdbMetricConverter.gauge( + "singer.processor.disable_decider_active", 1, "log=" + logStream.getSingerLog().getLogName()); result = false; break; } diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java index 6d081775..89ae25b2 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java @@ -78,7 +78,7 @@ public class SingerUtils { public static final FileSystem defaultFileSystem = FileSystems.getDefault(); public static String HOSTNAME = getHostname(); - public static List HOSTNAME_PREFIXES = getHostnamePrefixes(); + public static List HOSTNAME_PREFIXES = getHostnamePrefixes("-"); public static String getHostname() { String hostName; @@ -96,15 +96,15 @@ public static String getHostname() { } /*** - * Gradually builds substrings from hostname separated by dashes - * will return hostname if hostname does not contain dashes + * Gradually builds substrings from hostname separated by a given regex, + * will return hostname if hostname can't be split by regex * * @param * @return a list of hostname prefixes */ - public static List getHostnamePrefixes() { + public static List getHostnamePrefixes(String regex) { List hostPrefixes = new ArrayList<>(); - String [] splitHostname = HOSTNAME.split("-"); + String [] splitHostname = HOSTNAME.split(regex); StringBuilder currentPrefix = new StringBuilder(); for (String prefix : splitHostname) { currentPrefix.append(prefix); @@ -253,6 +253,7 @@ public static SingerConfig loadSingerConfig(String singerConfigDir, } LOG.info("Singer config loaded : " + singerConfig); + HOSTNAME_PREFIXES = getHostnamePrefixes(singerConfig.getHostnamePrefixRegex()); return singerConfig; } @@ -367,9 +368,9 @@ public static void deleteRecursively(File baseDir) { } } @VisibleForTesting - public static void setHostname(String hostname) { + public static void setHostname(String hostname, String regex) { HOSTNAME = hostname; - HOSTNAME_PREFIXES = getHostnamePrefixes(); + HOSTNAME_PREFIXES = getHostnamePrefixes(regex); } } diff --git a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java index 116550d7..3bbd5c46 100644 --- a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java +++ b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java @@ -312,7 +312,7 @@ public void testProcessLogStreamWithDecider() throws Exception { @Test public void testDisableDecider() throws Exception { DefaultLogStreamProcessor processor = null; - SingerUtils.setHostname("localhost-19970722"); + SingerUtils.setHostname("localhost-prod.cluster-19970722", "[.-]"); try { SingerConfig singerConfig = new SingerConfig(); singerConfig.setThreadPoolSize(1); @@ -338,7 +338,7 @@ public void testDisableDecider() throws Exception { assertEquals(false, processor.isLoggingAllowedByDecider()); Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 50); - Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost_19970722___decider", 100); + Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost_prod_cluster___decider", 100); assertEquals(false, processor.isLoggingAllowedByDecider()); } catch (Exception e) { @@ -349,7 +349,7 @@ public void testDisableDecider() throws Exception { processor.close(); } } - SingerUtils.setHostname(SingerUtils.getHostname()); + SingerUtils.setHostname(SingerUtils.getHostname(), "-"); } private static List getMessages(List messageAndPositions) { List messages = Lists.newArrayListWithExpectedSize(messageAndPositions.size()); From e171bcf544e9e33327f3b0ff9994597c65fd533f Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Tue, 23 Jan 2024 11:47:25 -0800 Subject: [PATCH 6/8] fix typos --- .../singer/processor/DefaultLogStreamProcessor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java index bcef29ec..0f10acbd 100644 --- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java @@ -68,7 +68,7 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable { // Decider for the log stream. private final String logDecider; - // Valid deciders tha can be used in conjunction with logDecider to disable the logstream at a fleet level + // Valid deciders that can be used in conjunction with logDecider to disable the logstream at a fleet level private final List disableDeciders; // LogStream to be processed. @@ -247,8 +247,8 @@ public long processLogStream() throws LogStreamProcessorException { /** * If the decider is not set, this method will return true. - * If a decider is set, return false when the decider's value is 0 or when the - * decider's value is != 0 and disable decider's (if exists) value is 100. + * If a decider is set, return false when the decider's value is 0 + * and disable decider's (if exists) value is 100. * * @return true or false. */ From 8e34c30bd909f281382b4d650e03fabaa50088a1 Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Tue, 23 Jan 2024 13:15:48 -0800 Subject: [PATCH 7/8] add utils test & address comments --- .../singer/common/SingerMetrics.java | 2 + .../com/pinterest/singer/config/Decider.java | 2 +- .../processor/DefaultLogStreamProcessor.java | 2 +- .../pinterest/singer/utils/SingerUtils.java | 5 ++- .../singer/utils/TestSingerUtils.java | 44 +++++++++++++++++++ 5 files changed, 52 insertions(+), 3 deletions(-) diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index f392e623..63897e32 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -61,6 +61,8 @@ public class SingerMetrics { public static final String PROCESSOR_MESSAGE_KEY_SIZE_BYTES = "processor.message.key.size.bytes"; public static final String PROCESSOR_MESSAGE_VALUE_SIZE_BYTES = "processor.message.value.size.bytes"; + public static final String DISABLE_DECIDER_ACTIVE = "singer.processor.disable_decider_active"; + public static final String SKIPPED_BYTES = "singer.reader.skipped_bytes"; public static final String WATERMARK_CREATION_FAILURE = "singer.watermark.creation.failure"; diff --git a/singer/src/main/java/com/pinterest/singer/config/Decider.java b/singer/src/main/java/com/pinterest/singer/config/Decider.java index e14ae2f5..29ae3c30 100644 --- a/singer/src/main/java/com/pinterest/singer/config/Decider.java +++ b/singer/src/main/java/com/pinterest/singer/config/Decider.java @@ -120,7 +120,7 @@ public Map getDeciderMap() { public List generateDisableDeciders(String logName) { List disableDeciderList = new ArrayList<>(); for (int i = SingerUtils.HOSTNAME_PREFIXES.size() - 1; i >= 0; i--) { - String convertedHostname = SingerUtils.HOSTNAME_PREFIXES.get(i).replace("-", "_"); + String convertedHostname = SingerUtils.HOSTNAME_PREFIXES.get(i).replaceAll("[^a-zA-Z0-9]", "_"); disableDeciderList.add("singer_disable_" + logName.replaceAll("[^a-zA-Z0-9]", "_") + "___" + convertedHostname + "___decider"); } return disableDeciderList; diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java index 0f10acbd..152344bb 100644 --- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java @@ -265,7 +265,7 @@ boolean isLoggingAllowedByDecider() { LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100", logStream.getLogStreamName(), disableDecider); OpenTsdbMetricConverter.gauge( - "singer.processor.disable_decider_active", 1, "log=" + logStream.getSingerLog().getLogName()); + SingerMetrics.DISABLE_DECIDER_ACTIVE, 1, "log=" + logStream.getSingerLog().getLogName()); result = false; break; } diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java index 89ae25b2..4756c1c1 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java @@ -96,13 +96,16 @@ public static String getHostname() { } /*** - * Gradually builds substrings from hostname separated by a given regex, + * Gradually builds substrings separated by dashes from hostname given a regex, * will return hostname if hostname can't be split by regex * * @param * @return a list of hostname prefixes */ public static List getHostnamePrefixes(String regex) { + if (regex == null || regex.isEmpty()) { + return Arrays.asList(HOSTNAME); + } List hostPrefixes = new ArrayList<>(); String [] splitHostname = HOSTNAME.split(regex); StringBuilder currentPrefix = new StringBuilder(); diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java index d4bc7464..6f89ca20 100644 --- a/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java +++ b/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java @@ -16,6 +16,7 @@ package com.pinterest.singer.utils; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.junit.Test; @@ -25,6 +26,8 @@ import com.pinterest.singer.thrift.configuration.SingerConfig; import com.pinterest.singer.thrift.configuration.SingerLogConfig; +import java.util.List; + public class TestSingerUtils { @Test @@ -53,4 +56,45 @@ public void testGetHostNameBasedOnConfig() { assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig); } + @Test + public void testGetHostnamePrefixes() { + // Check simple dashes only + String regex = "-"; + SingerUtils.setHostname("localhost-prod-cluster-19970722", regex); + String [] prefixes = {"localhost", "localhost-prod", "localhost-prod-cluster", "localhost-prod-cluster-19970722"}; + List finalPrefixes = SingerUtils.getHostnamePrefixes(regex); + assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes))); + + // Check dots and dashes + regex = "[.-]"; + SingerUtils.setHostname("localhost-prod.cluster-19970722", regex); + prefixes = new String[]{"localhost", "localhost-prod", "localhost-prod-cluster", "localhost-prod-cluster-19970722"}; + finalPrefixes = SingerUtils.getHostnamePrefixes(regex); + assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes))); + + // Check regex is empty + regex = ""; + SingerUtils.setHostname("localhost-dev.19970722", regex); + prefixes = new String []{"localhost-dev.19970722"}; + finalPrefixes = SingerUtils.getHostnamePrefixes(regex); + assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes))); + + // Check regex is null + regex = null; + SingerUtils.setHostname("localhost-dev.19970722", regex); + prefixes = new String []{"localhost-dev.19970722"}; + finalPrefixes = SingerUtils.getHostnamePrefixes(regex); + assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes))); + + // Check regex is not matched + regex = "abc"; + SingerUtils.setHostname("localhost-dev.19970722", regex); + prefixes = new String []{"localhost-dev.19970722"}; + finalPrefixes = SingerUtils.getHostnamePrefixes(regex); + assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes))); + + // reset hostname + SingerUtils.setHostname(SingerUtils.getHostname(), "-"); + } + } From b1f59f856c8195fb2e17bc6e5d47958be63ff8cd Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Tue, 23 Jan 2024 14:14:03 -0800 Subject: [PATCH 8/8] change comment --- .../pinterest/singer/processor/DefaultLogStreamProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java index 152344bb..be706265 100644 --- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java @@ -248,7 +248,7 @@ public long processLogStream() throws LogStreamProcessorException { /** * If the decider is not set, this method will return true. * If a decider is set, return false when the decider's value is 0 - * and disable decider's (if exists) value is 100. + * or disable decider's (if exists) value is 100. * * @return true or false. */