From 7cf87468f54b865e64158cd68462e0e97efbf56a Mon Sep 17 00:00:00 2001
From: kezhenxu94 <kezhenxu94@apache.org>
Date: Fri, 24 Nov 2023 15:42:15 +0800
Subject: [PATCH 1/2] Group service endpoints into `_abandoned` when endpoints
 have high cardinality

---
 docs/en/changes/changes.md                    |  2 +
 .../config/group/EndpointNameGrouping.java    | 40 +++++++++++++------
 2 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 582eb1ba0901..f9d9882f43e2 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -48,6 +48,8 @@
 * Replace Metrics v2 protocol with MQE in UI templates and E2E Test.
 * Fix incorrect apisix metrics otel rules.
 * Support `Scratch The OAP Config Dump`.
+* Group service endpoints into `_abandoned` when endpoints have high
+  cardinality.
 
 #### UI
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
index 6d9a850895f9..817348a98b9e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
@@ -18,17 +18,17 @@
 
 package org.apache.skywalking.oap.server.core.config.group;
 
-import io.vavr.Tuple2;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.ai.pipeline.services.api.HttpUriPattern;
 import org.apache.skywalking.oap.server.ai.pipeline.services.api.HttpUriRecognition;
 import org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRule4Openapi;
@@ -37,9 +37,14 @@
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.library.util.StringFormatGroup;
+import io.vavr.Tuple2;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class EndpointNameGrouping {
+    public static final String ABANDONED_ENDPOINT_NAME = "_abandoned";
+
     /**
      * Endpoint grouping according to local endpoint-name-grouping.yml or associated dynamic configuration.
      */
@@ -61,7 +66,8 @@ public class EndpointNameGrouping {
      * If the URI is formatted by the rules, the value would be the first 10 formatted names.
      * If the URI is unformatted, the value would be an empty queue.
      */
-    private ConcurrentHashMap<String, ConcurrentHashMap<String, ArrayBlockingQueue<String>>> cachedHttpUris = new ConcurrentHashMap<>();
+    private final Map<String/* service */, Map<String/* uri */, Queue<String>/* candidate patterns */>> cachedHttpUris = new ConcurrentHashMap<>();
+    private final Map<String/* service */, Set<String>/* unformatted uris */> unformattedHttpUris = new ConcurrentHashMap<>();
     private final AtomicInteger aiPipelineExecutionCounter = new AtomicInteger(0);
     /**
      * The max number of HTTP URIs per service for further URI pattern recognition.
@@ -90,7 +96,7 @@ public Tuple2<String, Boolean> format(String serviceName, String endpointName) {
         if (!formattedName._2() && quickUriGroupingRule != null) {
             formattedName = formatByQuickUriPattern(serviceName, endpointName);
 
-            ConcurrentHashMap<String, ArrayBlockingQueue<String>> svrHttpUris =
+            Map<String, Queue<String>> svrHttpUris =
                 cachedHttpUris.computeIfAbsent(serviceName, k -> new ConcurrentHashMap<>());
 
             // Only cache first N (determined by maxHttpUrisNumberPerService) URIs per 30 mins.
@@ -99,18 +105,30 @@ public Tuple2<String, Boolean> format(String serviceName, String endpointName) {
                     // Algorithm side should not return a pattern that has no {var} in it else this
                     // code may accidentally retrieve the size 1 queue created by unformatted endpoint
                     // The queue size is 10, which means only cache the first 10 formatted names.
-                    final ArrayBlockingQueue<String> formattedURIs = svrHttpUris.computeIfAbsent(
+                    final Queue<String> formattedURIs = svrHttpUris.computeIfAbsent(
                         formattedName._1(), k -> new ArrayBlockingQueue<>(10));
-                    if (formattedURIs.size() < 10) {
-                        // Try to push the raw URI as a candidate of formatted name.
-                        formattedURIs.offer(endpointName);
-                    }
+                    // Try to push the raw URI as a candidate of formatted name.
+                    formattedURIs.offer(endpointName);
                 } else {
                     svrHttpUris.computeIfAbsent(endpointName, k -> new ArrayBlockingQueue<>(1));
                 }
             }
         }
 
+        // If there are too many unformatted URIs, we will abandon the unformatted URIs to reduce
+        // the load of OAP and storage.
+        final var unformattedUrisOfService =
+            unformattedHttpUris.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet());
+        if (!formattedName._2()) {
+            if (unformattedUrisOfService.size() < maxHttpUrisNumberPerService) {
+                unformattedUrisOfService.add(endpointName);
+            } else {
+                formattedName = new Tuple2<>(ABANDONED_ENDPOINT_NAME, true);
+            }
+        } else {
+            unformattedUrisOfService.remove(endpointName);
+        }
+
         return formattedName;
     }
 
@@ -204,8 +222,6 @@ public void startHttpUriRecognitionSvr(final HttpUriRecognition httpUriRecogniti
                                              final List<HttpUriPattern> patterns
                                                  = httpUriRecognitionSvr.fetchAllPatterns(service.getName());
                                              if (CollectionUtils.isNotEmpty(patterns)) {
-                                                 StringFormatGroup group = new StringFormatGroup(
-                                                     patterns.size());
                                                  patterns.forEach(
                                                      p -> quickUriGroupingRule.addRule(
                                                          service.getName(), p.getPattern()));

From 2b35ab1e4812019c59f11074a5b915464d805adc Mon Sep 17 00:00:00 2001
From: kezhenxu94 <kezhenxu94@apache.org>
Date: Sat, 25 Nov 2023 13:13:23 +0800
Subject: [PATCH 2/2] Add cache expiration for unformatted uris

---
 .../core/config/group/EndpointNameGrouping.java   | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
index 817348a98b9e..5647c30590ff 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.config.group;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +38,9 @@
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.library.util.StringFormatGroup;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import io.vavr.Tuple2;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -67,7 +71,13 @@ public class EndpointNameGrouping {
      * If the URI is unformatted, the value would be an empty queue.
      */
     private final Map<String/* service */, Map<String/* uri */, Queue<String>/* candidate patterns */>> cachedHttpUris = new ConcurrentHashMap<>();
-    private final Map<String/* service */, Set<String>/* unformatted uris */> unformattedHttpUris = new ConcurrentHashMap<>();
+    private final LoadingCache<String/* service */, Set<String>/* unformatted uris */> unformattedHttpUrisCache = 
+        CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(10)).build(new CacheLoader<>() {
+            @Override
+            public Set<String> load(String service) {
+                return ConcurrentHashMap.newKeySet();
+            }
+        });
     private final AtomicInteger aiPipelineExecutionCounter = new AtomicInteger(0);
     /**
      * The max number of HTTP URIs per service for further URI pattern recognition.
@@ -117,8 +127,7 @@ public Tuple2<String, Boolean> format(String serviceName, String endpointName) {
 
         // If there are too many unformatted URIs, we will abandon the unformatted URIs to reduce
         // the load of OAP and storage.
-        final var unformattedUrisOfService =
-            unformattedHttpUris.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet());
+        final var unformattedUrisOfService = unformattedHttpUrisCache.getUnchecked(serviceName);
         if (!formattedName._2()) {
             if (unformattedUrisOfService.size() < maxHttpUrisNumberPerService) {
                 unformattedUrisOfService.add(endpointName);