Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance SegmentStatusChecker to honor no-queryable servers and instance assignment config #14536

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

mqliang
Copy link
Contributor

@mqliang mqliang commented Nov 25, 2024

ref: #14538

This PR fix enhance SegmentStatusChecker in two scenarios:

If user has instance assignment config, a user may make offline segments and consuming segments have different replica groups (e.g. use more replicas for consuming segments, but less replicas for online segments).

In this situation, when calculate PERCENT_OF_REPLICAS , it's problematic to use global maxISReplicas as denominator and global minEVReplicas as nominator. For example, if user config replica groups for immutable segments as 3, and 5 for mutable segments, PERCENT_OF_REPLICAS will always be 60% even if all replicas are up, which will cause some false negative alerts.

This PR change the logic to calculate EVReplicasUpPercent for each segment then emit the minimal percentage.

When a sever is configured as queriesDisabled or shutdownInProgress, broker does not send queries to the server. We need technically treat those replicas that on the no-queryable server as "OFFLINE" even if it shows "ONLINE" in helix

@mqliang mqliang force-pushed the mqliang/max-is-replicas branch from 664d301 to 5697b33 Compare November 25, 2024 20:05
@codecov-commenter
Copy link

codecov-commenter commented Nov 25, 2024

Codecov Report

Attention: Patch coverage is 71.83099% with 20 lines in your changes missing coverage. Please review.

Project coverage is 63.94%. Comparing base (59551e4) to head (bbba882).
Report is 1427 commits behind head on master.

Files with missing lines Patch % Lines
.../pinot/controller/util/ServerQueryInfoFetcher.java 58.53% 16 Missing and 1 partial ⚠️
...e/pinot/controller/helix/SegmentStatusChecker.java 90.00% 1 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14536      +/-   ##
============================================
+ Coverage     61.75%   63.94%   +2.18%     
- Complexity      207     1573    +1366     
============================================
  Files          2436     2688     +252     
  Lines        133233   147716   +14483     
  Branches      20636    22635    +1999     
============================================
+ Hits          82274    94452   +12178     
- Misses        44911    46326    +1415     
- Partials       6048     6938     +890     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.92% <71.83%> (+2.21%) ⬆️
java-21 63.83% <71.83%> (+2.21%) ⬆️
skip-bytebuffers-false 63.93% <71.83%> (+2.19%) ⬆️
skip-bytebuffers-true 63.81% <71.83%> (+36.08%) ⬆️
temurin 63.94% <71.83%> (+2.18%) ⬆️
unittests 63.93% <71.83%> (+2.19%) ⬆️
unittests1 55.63% <ø> (+8.74%) ⬆️
unittests2 34.59% <71.83%> (+6.86%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines 365 to 366
if (numEVConsumingReplicas == 0) { // it's a immutable segment
minEVImmutableReplicas = Math.min(minEVImmutableReplicas, numEVOnlineReplicas);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic because there might be a scenario in which all replicas for segment with IS in CONSUMING state are OFFLINE or ERROR. For this scenario, numEVConsumingReplicas is indeed zero and we should report it as zero.

Copy link
Contributor Author

@mqliang mqliang Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing L365 as

if (numISConsumingReplicas == 0 && numEVConsumingReplicas == 0) { // it's a immutable segment

This should also cover the scenario that a mutable segment just transit to a immutable segment:

  • numISConsumingReplicas == 0 && numEVConsumingReplicas == 0 : it's must be a immutable segment
  • numISConsumingReplicas != 0 || numEVConsumingReplicas != 0: it's possible that this segment just transit to a immutable sgement, but we treat it as a mutable segment

@mqliang mqliang force-pushed the mqliang/max-is-replicas branch from 5697b33 to 1f10cbf Compare November 26, 2024 01:13
@mqliang mqliang changed the title Handle mutable and immutable segments replicas metric separately Enhance SegmentStatusChecker to honor no-queryable servers and instance assignment config Nov 26, 2024
if (minEVReplicas < maxISReplicas) {
LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}",
tableNameWithType, minEVReplicas, maxISReplicas);
}
Copy link
Contributor Author

@mqliang mqliang Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sajjad-moradi This log need to be deleted, as

  1. This log will always (and falsely) being printed out when online and consuming segments have different replica group.
  2. L388-L392 already log the partial online segments. This log is redundant

Copy link
Contributor

@jasperjiaguo jasperjiaguo Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some new logging instead? Meanwhile, it would be great if we have a list of segments having min replicas and print them out as well? I feel sometimes the debuggability of alerts on this metric is not so great. nvm [L388-L392] should be good

@mqliang mqliang closed this Nov 26, 2024
@mqliang mqliang reopened this Nov 26, 2024
@mqliang mqliang force-pushed the mqliang/max-is-replicas branch from 4c5224a to bf098d3 Compare November 26, 2024 06:59
@mqliang mqliang closed this Nov 26, 2024
@mqliang mqliang reopened this Nov 26, 2024
@mqliang mqliang closed this Nov 26, 2024
@mqliang mqliang reopened this Nov 26, 2024
@mqliang mqliang closed this Nov 26, 2024
@mqliang mqliang reopened this Nov 26, 2024
@jasperjiaguo
Copy link
Contributor

Discussed internally that we should have an instance config cache and fetch on demand. Meanwhile, do we want to have 2 versions of the metric that maybe config-controlled or both available? In case anyone relies on the old.

@mqliang mqliang force-pushed the mqliang/max-is-replicas branch from a39185a to 7ef229c Compare November 26, 2024 23:24
@mqliang mqliang force-pushed the mqliang/max-is-replicas branch from 2a90e0e to f13578b Compare November 27, 2024 04:07
@mqliang mqliang force-pushed the mqliang/max-is-replicas branch from f13578b to 57e8bdc Compare November 27, 2024 04:11
_serverStarters.add(startOneServer(i));
BaseServerStarter serverStarter = startOneServer(i);
_serverStarters.add(serverStarter);
_helixAdmin.enableInstance(getHelixClusterName(), serverStarter.getInstanceId(), true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ControllerPeriodicTasksIntegrationTest.testSegmentStatusChecker() will fail without this.

@mqliang
Copy link
Contributor Author

mqliang commented Nov 27, 2024

all other testing succeeded. Only KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest failed due to docker image pulling issue. It's not related to the change

org.testcontainers.containers.ContainerFetchException: Can't get Docker image: RemoteDockerImage(imageName=confluentinc/cp-kafka:7.2.0, imagePullPolicy=DefaultPullPolicy(), imageNameSubstitutor=org.testcontainers.utility.ImageNameSubstitutor$LogWrappedImageNameSubstitutor@8be6fcf)

full log

Error:  Tests run: 24, Failures: 1, Errors: 0, Skipped: 23, Time elapsed: 277.7 s <<< FAILURE! -- in org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.setUp -- Time elapsed: 277.5 s <<< FAILURE!
org.testcontainers.containers.ContainerFetchException: Can't get Docker image: RemoteDockerImage(imageName=confluentinc/cp-kafka:7.2.0, imagePullPolicy=DefaultPullPolicy(), imageNameSubstitutor=org.testcontainers.utility.ImageNameSubstitutor$LogWrappedImageNameSubstitutor@8be6fcf)
	at org.testcontainers.containers.GenericContainer.getDockerImageName(GenericContainer.java:1364)
	at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:351)
	at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)
	at org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter$KafkaSchemaRegistryInstance.start(SchemaRegistryStarter.java:74)
	at org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter.startLocalInstance(SchemaRegistryStarter.java:46)
	at org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.startSchemaRegistry(KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java:99)
	at org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.startKafka(KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java:88)
	at org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest.setUp(BaseRealtimeClusterIntegrationTest.java:54)
	at org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.setUp(KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java:292)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:141)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethodConsideringTimeout(MethodInvocationHelper.java:71)
	at org.testng.internal.invokers.ConfigInvoker.invokeConfigurationMethod(ConfigInvoker.java:400)
	at org.testng.internal.invokers.ConfigInvoker.invokeConfigurations(ConfigInvoker.java:333)
	at org.testng.internal.invokers.TestMethodWorker.invokeBeforeClassMethods(TestMethodWorker.java:188)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at org.testng.TestRunner.privateRun(TestRunner.java:739)
	at org.testng.TestRunner.run(TestRunner.java:614)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:421)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:413)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:373)
	at org.testng.SuiteRunner.run(SuiteRunner.java:312)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1274)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1208)
	at org.testng.TestNG.runSuites(TestNG.java:1112)
	at org.testng.TestNG.run(TestNG.java:1079)
	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:155)
	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeSingleClass(TestNGDirectoryTestSuite.java:102)
	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:91)
	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:137)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:385)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
	at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:507)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:495)
Caused by: org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException: Condition org.testcontainers.images.RemoteDockerImage$$Lambda/0x00007fea50d4ab00 was not fulfilled within 2 minutes.
	at org.testcontainers.shaded.org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.testcontainers.shaded.org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
	at org.testcontainers.shaded.org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
	at org.testcontainers.shaded.org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
	at org.testcontainers.shaded.org.awaitility.core.ConditionFactory.until(ConditionFactory.java:954)
	at org.testcontainers.images.RemoteDockerImage.resolve(RemoteDockerImage.java:109)
	at org.testcontainers.images.RemoteDockerImage.resolve(RemoteDockerImage.java:35)
	at org.testcontainers.utility.LazyFuture.getResolvedValue(LazyFuture.java:20)
	at org.testcontainers.utility.LazyFuture.get(LazyFuture.java:41)
	at org.testcontainers.containers.GenericContainer.getDockerImageName(GenericContainer.java:1362)
	... 37 more

Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped
Error:  org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest skipped

* This is a helper class that fetch server information from Helix/ZK. It caches the server information to avoid
* repeated ZK access. This class is NOT thread-safe.
*/
public class ServerInfoFetcher {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is too generic. I suggest to rename it to something like ServerQueryStateFetcher.
Also this is only used in SegmentStatusChecker class. Let's make it private to that class (move it to that class), and later if there's any use elsewhere, it can be made public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It not only contains state (helix_enabled, query_disabled, shutdown_in_progress), but also contains some other information (tables on that server, instance tags, and potentially @jasperjiaguo will add more). So I rename it as ServerQueryInfoFetcher.

In addition, @jasperjiaguo will use this ServerQueryInfoFetcher very soon in his other PRs, and it's under util package, it's OK to be public. There are some other util class such as SegmentIntervalUtils, AutoAddInvertedIndex, they are used by only one class, but also be public.

@@ -80,6 +82,8 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh

private long _lastDisabledTableLogTimestamp = 0;

private ServerInfoFetcher _serverInfoFetcher;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not have this as a class member. The reason is the cache you have in the class is not cleared. If you create an ServerInfoFetcher object in each run of the segment status checker, there's no need to worry about the cache eviction.

@mqliang mqliang force-pushed the mqliang/max-is-replicas branch 2 times, most recently from d4d5962 to 24660fb Compare December 4, 2024 21:15
@mqliang
Copy link
Contributor Author

mqliang commented Dec 4, 2024

@sajjad-moradi @jasperjiaguo comments addresses. PTAL.

@mqliang
Copy link
Contributor Author

mqliang commented Dec 4, 2024

cc @Jackie-Jiang

@mqliang mqliang force-pushed the mqliang/max-is-replicas branch from 24660fb to bbba882 Compare December 4, 2024 21:32
numEVReplicas++;
String serverInstanceId = entry.getKey();
String segmentState = entry.getValue();
if (isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(totally optional) maybe encapsulate this in the serverQueryInfoFetcher as well

String serverInstanceId = entry.getKey();
String segmentState = entry.getValue();
if (isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))
&& (segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(totally optional) We can probably make this a set called acceptableStates and do acceptableStates.contains(segmentState)

@sajjad-moradi
Copy link
Contributor

@Jackie-Jiang could you please review this?

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this PR still WIP? I don't see the change related to checking instance assignment config

Comment on lines +36 to +37
private PinotHelixResourceManager _pinotHelixResourceManager;
private Map<String, ServerQueryInfo> _cache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) They can be final

return new ServerQueryInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress);
}

public static class ServerQueryInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need setters for this class, and we can make all fields final

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems most of the fields are not used. Are you planning to use them?

private boolean _helixEnabled;
private boolean _queriesDisabled;
private boolean _shutdownInProgress;
private ServerQueryInfo(String instanceName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) Add an empty line, and reformat the constructor per Pinot Style

Comment on lines +55 to +56
boolean helixEnabled = record.getBooleanField(
InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use instanceConfig.getInstanceEnabled().
We can simplify the ServerQueryInfo to combine all 3 booleans, and call it queryEnabled

_cache = new HashMap<>();
}

public ServerQueryInfo getServerQueryInfo(String instanceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate the return as @Nullable

return _cache.computeIfAbsent(instanceId, this::getServerQueryInfoOndemand);
}

private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) {
@Nullable
private ServerQueryInfo fetchServerQueryInfo(String instanceId) {

numEVReplicas++;
String serverInstanceId = entry.getKey();
String segmentState = entry.getValue();
if (isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check segment state first because the overhead for that is much smaller

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants