diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 41a39efb7baa..2326ed69bed4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -29,7 +29,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; @@ -48,12 +47,13 @@ import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; +import org.apache.pinot.controller.util.ServerInfoCache; +import org.apache.pinot.controller.util.ServerInfoCache.ServerInfo; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -82,6 +82,8 @@ public class SegmentStatusChecker extends ControllerPeriodicTask entry : stateMap.entrySet()) { String serverInstanceId = entry.getKey(); String state = entry.getValue(); - if (context._queryableServers.contains(serverInstanceId) - && (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { + if (isServerQueryable(serverInstanceId) + && (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { numEVReplicasUp++; } if (state.equals(SegmentStateModel.ERROR)) { @@ -444,6 +434,14 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } + private boolean isServerQueryable(String serverInstanceId) { + ServerInfo serverInfo = _serverInfoCache.getServerInfo(serverInstanceId); + return serverInfo != null + && serverInfo.isHelixEnabled() + && !serverInfo.isQueriesDisabled() + && !serverInfo.isShutdownInProgress(); + } + private static String logSegments(List segments) { if (segments.size() <= MAX_SEGMENTS_TO_LOG) { return segments.toString(); @@ -491,6 +489,5 @@ public static final class Context { private final Set _processedTables = new HashSet<>(); private final Set _disabledTables = new HashSet<>(); private final Set _pausedTables = new HashSet<>(); - private final Set _queryableServers = new HashSet<>(); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f009cbddc9ae..e7affa4287d6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -515,11 +515,6 @@ public List getAllControllerInstanceConfigs() { .filter(instance -> InstanceTypeUtils.isController(instance.getId())).collect(Collectors.toList()); } - public List getAllServerInstanceConfigs() { - return HelixHelper.getInstanceConfigs(_helixZkManager).stream() - .filter(instance -> InstanceTypeUtils.isServer(instance.getId())).collect(Collectors.toList()); - } - public List getAllMinionInstanceConfigs() { return HelixHelper.getInstanceConfigs(_helixZkManager).stream() .filter(instance -> InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoCache.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoCache.java new file mode 100644 index 000000000000..0f2e894cecfe --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoCache.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; + + +/** + * This is a helper class maintaining a cache of server information. It is used to avoid repeated calls to Helix to + * get server information. This class is not thread safe. + */ +public class ServerInfoCache { + PinotHelixResourceManager _pinotHelixResourceManager; + Map _serverInfoMap; + + public ServerInfoCache(PinotHelixResourceManager pinotHelixResourceManager) { + this._pinotHelixResourceManager = pinotHelixResourceManager; + this._serverInfoMap = new HashMap<>(); + } + + public ServerInfo getServerInfo(String instanceId) { + return _serverInfoMap.computeIfAbsent(instanceId, this::getServerInfoOndemand); + } + + private ServerInfo getServerInfoOndemand(String instanceId) { + InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); + if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) { + return null; + } + List tags = instanceConfig.getTags(); + ZNRecord record = instanceConfig.getRecord(); + boolean helixEnabled = record.getBooleanField( + InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false); + boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false); + boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false); + return new ServerInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress); + } + + public static class ServerInfo { + private String _instanceName; + private List _tags; + private List _tables; + private boolean _helixEnabled; + private boolean _queriesDisabled; + private boolean _shutdownInProgress; + private ServerInfo(String instanceName, + List tags, + List tables, + boolean helixEnabled, + boolean queriesDisabled, + boolean shutdownInProgress) { + _instanceName = instanceName; + _tags = tags; + _tables = tables; + _helixEnabled = helixEnabled; + _queriesDisabled = queriesDisabled; + _shutdownInProgress = shutdownInProgress; + } + + public String getInstanceName() { + return _instanceName; + } + + public void setInstanceName(String instanceName) { + _instanceName = instanceName; + } + + public List getTags() { + return _tags; + } + + public void setTags(List tags) { + _tags = tags; + } + + public List getTables() { + return _tables; + } + + public void setTables(List tables) { + _tables = tables; + } + + public boolean isHelixEnabled() { + return _helixEnabled; + } + + public void setHelixEnabled(boolean helixEnabled) { + _helixEnabled = helixEnabled; + } + + public boolean isQueriesDisabled() { + return _queriesDisabled; + } + + public void setQueriesDisabled(boolean queriesDisabled) { + _queriesDisabled = queriesDisabled; + } + + public boolean isShutdownInProgress() { + return _shutdownInProgress; + } + + public void setShutdownInProgress(boolean shutdownInProgress) { + _shutdownInProgress = shutdownInProgress; + } + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index a6c3df63be0d..f41084f1a6ab 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -107,13 +107,7 @@ public void offlineBasicTest() { externalView.setState("myTable_4", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); @@ -224,12 +218,7 @@ public void realtimeBasicTest() { externalView.setState(seg3, "pinot3", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3")) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -295,14 +284,7 @@ public void realtimeMutableSegmentHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3"), - newQuerableInstanceConfig("pinot4") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -368,14 +350,14 @@ public void realtimeServerNotQueryableTest() { externalView.setState(seg3, "Server_pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQueryDisabledInstanceConfig("Server_pinot1"), - newShutdownInProgressInstanceConfig("Server_pinot2"), - newQuerableInstanceConfig("Server_pinot3"), - newQuerableInstanceConfig("Server_pinot4") - ) - ); + when(resourceManager.getHelixInstanceConfig("Server_pinot1")). + thenReturn(newQueryDisabledInstanceConfig("Server_pinot1")); + when(resourceManager.getHelixInstanceConfig("Server_pinot2")). + thenReturn(newShutdownInProgressInstanceConfig("Server_pinot2")); + when(resourceManager.getHelixInstanceConfig("Server_pinot3")). + thenReturn(newQuerableInstanceConfig("Server_pinot3")); + when(resourceManager.getHelixInstanceConfig("Server_pinot4")). + thenReturn(newQuerableInstanceConfig("Server_pinot4")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -461,14 +443,7 @@ public void realtimeImmutableSegmentHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3"), - newQuerableInstanceConfig("pinot4") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -535,13 +510,7 @@ public void missingEVPartitionTest() { externalView.setState("myTable_1", "pinot2", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -632,12 +601,7 @@ public void missingEVPartitionPushTest() { externalView.setState("myTable_2", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -780,8 +744,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() { } PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of(newQuerableInstanceConfig("pinot1"))); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);