Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance SegmentStatusChecker to honor no-queryable servers and instance assignment config #14536

Merged
merged 12 commits into from
Jan 10, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.ServerQueryInfoFetcher;
import org.apache.pinot.controller.util.ServerQueryInfoFetcher.ServerQueryInfo;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -91,7 +93,6 @@ public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);

_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
_tableSizeReader = tableSizeReader;
}
Expand Down Expand Up @@ -209,6 +210,8 @@ private void updateTableSizeMetrics(String tableNameWithType)
private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);

ServerQueryInfoFetcher serverQueryInfoFetcher = new ServerQueryInfoFetcher(_pinotHelixResourceManager);

IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);

if (idealState == null) {
Expand Down Expand Up @@ -269,10 +272,12 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon

ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);

// Maximum number of replicas in ideal state
int maxISReplicas = Integer.MIN_VALUE;
// Minimum number of replicas in external view
int minEVReplicas = Integer.MAX_VALUE;
// Maximum number of replicas that is up (ONLINE/CONSUMING) in ideal state
int maxISReplicasUp = Integer.MIN_VALUE;
// Minimum number of replicas that is up (ONLINE/CONSUMING) in external view
int minEVReplicasUp = Integer.MAX_VALUE;
// Minimum percentage of replicas that is up (ONLINE/CONSUMING) in external view
int minEVReplicasUpPercent = 100;
// Total compressed segment size in deep store
long tableCompressedSize = 0;
// Segments without ZK metadata
Expand All @@ -286,18 +291,19 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
List<String> segmentsInvalidStartTime = new ArrayList<>();
List<String> segmentsInvalidEndTime = new ArrayList<>();
for (String segment : segments) {
int numISReplicas = 0;
// Number of replicas in ideal state that is in ONLINE/CONSUMING state
int numISReplicasUp = 0;
for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numISReplicas++;
numISReplicasUp++;
}
}
// Skip segments not ONLINE/CONSUMING in ideal state
if (numISReplicas == 0) {
// Skip segments with no ONLINE/CONSUMING in ideal state
if (numISReplicasUp == 0) {
continue;
}
maxISReplicas = Math.max(maxISReplicas, numISReplicas);
maxISReplicasUp = Math.max(maxISReplicasUp, numISReplicasUp);

SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment);
// Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted.
Expand Down Expand Up @@ -330,46 +336,49 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

int numEVReplicas = 0;
int numEVReplicasUp = 0;
if (externalView != null) {
Map<String, String> stateMap = externalView.getStateMap(segment);
if (stateMap != null) {
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numEVReplicas++;
String serverInstanceId = entry.getKey();
String segmentState = entry.getValue();
if ((segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING))
&& isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))) {
numEVReplicasUp++;
}
if (state.equals(SegmentStateModel.ERROR)) {
if (segmentState.equals(SegmentStateModel.ERROR)) {
errorSegments.add(Pair.of(segment, entry.getKey()));
}
}
}
}
if (numEVReplicas == 0) {
if (numEVReplicasUp == 0) {
offlineSegments.add(segment);
} else if (numEVReplicas < numISReplicas) {
} else if (numEVReplicasUp < numISReplicasUp) {
partialOnlineSegments.add(segment);
} else {
// Do not allow nReplicasEV to be larger than nReplicasIS
numEVReplicas = numISReplicas;
// Do not allow numEVReplicasUp to be larger than numISReplicasUp
numEVReplicasUp = numISReplicasUp;
}
minEVReplicas = Math.min(minEVReplicas, numEVReplicas);

minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp);
// Total number of replicas in ideal state (including ERROR/OFFLINE states)
int numISReplicasTotal = Math.max(idealState.getInstanceStateMap(segment).entrySet().size(), 1);
minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal);
}

if (maxISReplicas == Integer.MIN_VALUE) {
if (maxISReplicasUp == Integer.MIN_VALUE) {
try {
maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
maxISReplicasUp = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
maxISReplicas = 1;
maxISReplicasUp = 1;
}
}
// Do not allow minEVReplicas to be larger than maxISReplicas
minEVReplicas = Math.min(minEVReplicas, maxISReplicas);

if (minEVReplicas < maxISReplicas) {
LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}",
tableNameWithType, minEVReplicas, maxISReplicas);
}
Copy link
Contributor Author

@mqliang mqliang Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sajjad-moradi This log need to be deleted, as

  1. This log will always (and falsely) being printed out when online and consuming segments have different replica group.
  2. L388-L392 already log the partial online segments. This log is redundant

Copy link
Contributor

@jasperjiaguo jasperjiaguo Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some new logging instead? Meanwhile, it would be great if we have a list of segments having min replicas and print them out as well? I feel sometimes the debuggability of alerts on this metric is not so great. nvm [L388-L392] should be good

// Do not allow minEVReplicasUp to be larger than maxISReplicasUp
minEVReplicasUp = Math.min(minEVReplicasUp, maxISReplicasUp);

int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size();
if (numSegmentsWithoutZKMetadata > 0) {
LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata,
Expand Down Expand Up @@ -402,9 +411,9 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}

// Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicasUp);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
minEVReplicas * 100L / maxISReplicas);
minEVReplicasUpPercent);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE,
numErrorSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
Expand All @@ -426,6 +435,13 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

private boolean isServerQueryable(ServerQueryInfo serverInfo) {
return serverInfo != null
&& serverInfo.isHelixEnabled()
&& !serverInfo.isQueriesDisabled()
&& !serverInfo.isShutdownInProgress();
}

private static String logSegments(List<?> segments) {
if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
return segments.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;


/**
* This is a helper class that fetch server information from Helix/ZK. It caches the server information to avoid
* repeated ZK access. This class is NOT thread-safe.
*/
public class ServerQueryInfoFetcher {
private final PinotHelixResourceManager _pinotHelixResourceManager;
private final Map<String, ServerQueryInfo> _cache;

public ServerQueryInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_cache = new HashMap<>();
}

@Nullable
public ServerQueryInfo getServerQueryInfo(String instanceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate the return as @Nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return _cache.computeIfAbsent(instanceId, this::getServerQueryInfoOndemand);
}

@Nullable
private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) {
@Nullable
private ServerQueryInfo fetchServerQueryInfo(String instanceId) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) {
return null;
}
List<String> tags = instanceConfig.getTags();
ZNRecord record = instanceConfig.getRecord();
boolean helixEnabled = instanceConfig.getInstanceEnabled();
boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false);
boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false);

return new ServerQueryInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress);
}

public static class ServerQueryInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need setters for this class, and we can make all fields final

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems most of the fields are not used. Are you planning to use them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems most of the fields are not used. Are you planning to use them?

Yes, @jasperjiaguo will need those fields very soon

private final String _instanceName;
private final List<String> _tags;
private final List<String> _tables;
private final boolean _helixEnabled;
private final boolean _queriesDisabled;
private final boolean _shutdownInProgress;

private ServerQueryInfo(String instanceName, List<String> tags, List<String> tables, boolean helixEnabled,
boolean queriesDisabled, boolean shutdownInProgress) {
_instanceName = instanceName;
_tags = tags;
_tables = tables;
_helixEnabled = helixEnabled;
_queriesDisabled = queriesDisabled;
_shutdownInProgress = shutdownInProgress;
}

public boolean isHelixEnabled() {
return _helixEnabled;
}

public boolean isQueriesDisabled() {
return _queriesDisabled;
}

public boolean isShutdownInProgress() {
return _shutdownInProgress;
}
}
}
Loading
Loading