From 87308b446a3a5e385e1b4edd80f365de64c7d2f8 Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Mon, 25 Nov 2024 15:54:28 -0800 Subject: [PATCH 01/12] Fix PERCENT_OF_REPLICAS metric --- .../helix/SegmentStatusChecker.java | 58 +++---- .../helix/SegmentStatusCheckerTest.java | 162 +++++++++++++++++- 2 files changed, 183 insertions(+), 37 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index c9a48022c0be..61019a30e336 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -269,10 +269,12 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType); - // Maximum number of replicas in ideal state - int maxISReplicas = Integer.MIN_VALUE; - // Minimum number of replicas in external view - int minEVReplicas = Integer.MAX_VALUE; + // Maximum number of replicas that is up (ONLINE/CONSUMING) in ideal state + int maxISReplicasUp = Integer.MIN_VALUE; + // Minimum number of replicas that is up (ONLINE/CONSUMING) in external view + int minEVReplicasUp = Integer.MAX_VALUE; + // Minimum percentage of replicas that is up (ONLINE/CONSUMING) in external view + int minEVReplicasUpPercent = 100; // Total compressed segment size in deep store long tableCompressedSize = 0; // Segments without ZK metadata @@ -286,18 +288,19 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon List segmentsInvalidStartTime = new ArrayList<>(); List segmentsInvalidEndTime = new ArrayList<>(); for (String segment : segments) { - int numISReplicas = 0; + // Number of replicas in ideal state that is in ONLINE/CONSUMING state + int numISReplicasUp = 0; for (Map.Entry entry : idealState.getInstanceStateMap(segment).entrySet()) { String state = entry.getValue(); if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { - numISReplicas++; + numISReplicasUp++; } } - // Skip segments not ONLINE/CONSUMING in ideal state - if (numISReplicas == 0) { + // Skip segments with no ONLINE/CONSUMING in ideal state + if (numISReplicasUp == 0) { continue; } - maxISReplicas = Math.max(maxISReplicas, numISReplicas); + maxISReplicasUp = Math.max(maxISReplicasUp, numISReplicasUp); SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment); // Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted. @@ -330,14 +333,14 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } - int numEVReplicas = 0; + int numEVReplicasUp = 0; if (externalView != null) { Map stateMap = externalView.getStateMap(segment); if (stateMap != null) { for (Map.Entry entry : stateMap.entrySet()) { String state = entry.getValue(); if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { - numEVReplicas++; + numEVReplicasUp++; } if (state.equals(SegmentStateModel.ERROR)) { errorSegments.add(Pair.of(segment, entry.getKey())); @@ -345,31 +348,29 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } } - if (numEVReplicas == 0) { + if (numEVReplicasUp == 0) { offlineSegments.add(segment); - } else if (numEVReplicas < numISReplicas) { + } else if (numEVReplicasUp < numISReplicasUp) { partialOnlineSegments.add(segment); - } else { - // Do not allow nReplicasEV to be larger than nReplicasIS - numEVReplicas = numISReplicas; } - minEVReplicas = Math.min(minEVReplicas, numEVReplicas); + + minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp); + // Number of replicas in ideal state (including ERROR/OFFLINE states) + int numISReplicas = idealState.getInstanceStateMap(segment).entrySet().size(); + minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicas); } - if (maxISReplicas == Integer.MIN_VALUE) { + if (maxISReplicasUp == Integer.MIN_VALUE) { try { - maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1); + maxISReplicasUp = Math.max(Integer.parseInt(idealState.getReplicas()), 1); } catch (NumberFormatException e) { - maxISReplicas = 1; + maxISReplicasUp = 1; } } - // Do not allow minEVReplicas to be larger than maxISReplicas - minEVReplicas = Math.min(minEVReplicas, maxISReplicas); - if (minEVReplicas < maxISReplicas) { - LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}", - tableNameWithType, minEVReplicas, maxISReplicas); - } + // Do not allow minEVReplicasUp to be larger than maxISReplicas + minEVReplicasUp = Math.min(minEVReplicasUp, maxISReplicasUp); + int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size(); if (numSegmentsWithoutZKMetadata > 0) { LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata, @@ -402,9 +403,8 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas); - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, - minEVReplicas * 100L / maxISReplicas); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicasUp); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, minEVReplicasUpPercent); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, numErrorSegments); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 5f2ae7ea32f4..4b2f18f7285e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -47,6 +47,9 @@ import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; @@ -56,14 +59,9 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; @SuppressWarnings("unchecked") @@ -196,9 +194,11 @@ public void realtimeBasicTest() { idealState.setPartitionState(seg1, "pinot1", "ONLINE"); idealState.setPartitionState(seg1, "pinot2", "ONLINE"); idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); idealState.setPartitionState(seg2, "pinot2", "ONLINE"); idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); idealState.setPartitionState(seg3, "pinot3", "OFFLINE"); @@ -209,9 +209,11 @@ public void realtimeBasicTest() { externalView.setState(seg1, "pinot1", "ONLINE"); externalView.setState(seg1, "pinot2", "ONLINE"); externalView.setState(seg1, "pinot3", "ONLINE"); + externalView.setState(seg2, "pinot1", "CONSUMING"); externalView.setState(seg2, "pinot2", "ONLINE"); externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot1", "CONSUMING"); externalView.setState(seg3, "pinot2", "CONSUMING"); externalView.setState(seg3, "pinot3", "OFFLINE"); @@ -239,6 +241,150 @@ public void realtimeBasicTest() { ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); } + @Test + public void realtimeMutableSegmnetHasLessReplicaTest() { + InstanceAssignmentConfig instanceAssignmentConfig = + new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false); + + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) + .setInstanceAssignmentConfigMap( + Map.of("CONSUMING", instanceAssignmentConfig) + ) + .build(); + + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot3", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot4", "OFFLINE"); + + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "pinot1", "ONLINE"); + externalView.setState(seg1, "pinot2", "ONLINE"); + externalView.setState(seg1, "pinot3", "ONLINE"); + + externalView.setState(seg2, "pinot1", "CONSUMING"); + externalView.setState(seg2, "pinot2", "ONLINE"); + externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg2, "pinot4", "CONSUMING"); + + externalView.setState(seg3, "pinot1", "CONSUMING"); + externalView.setState(seg3, "pinot2", "CONSUMING"); + externalView.setState(seg3, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot4", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 3, 75, 0, 100, 0, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + + @Test + public void realtimeImmutableSegmnetHasLessReplicaTest() { + InstanceAssignmentConfig instanceAssignmentConfig = + new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false); + + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) + .setInstanceAssignmentConfigMap( + Map.of("CONSUMING", instanceAssignmentConfig) + ) + .build(); + + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot3", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot4", "OFFLINE"); + + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "pinot1", "ONLINE"); + externalView.setState(seg1, "pinot2", "ONLINE"); + externalView.setState(seg1, "pinot3", "OFFLINE"); + + externalView.setState(seg2, "pinot1", "CONSUMING"); + externalView.setState(seg2, "pinot2", "ONLINE"); + externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg2, "pinot4", "CONSUMING"); + + externalView.setState(seg3, "pinot1", "CONSUMING"); + externalView.setState(seg3, "pinot2", "CONSUMING"); + externalView.setState(seg3, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot4", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 2, 66, 0, 100, 1, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + private Map getStreamConfigMap() { return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", From 1f10cbf323daf1a1fca30cf81494bf2e5a9e1f6e Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Mon, 25 Nov 2024 17:13:00 -0800 Subject: [PATCH 02/12] Treat segments on not-queryable servers as down --- .../helix/SegmentStatusChecker.java | 34 +++++- .../helix/SegmentStatusCheckerTest.java | 113 +++++++++++++++--- 2 files changed, 125 insertions(+), 22 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 61019a30e336..1ae54bff1c2f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; @@ -338,8 +339,10 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon Map stateMap = externalView.getStateMap(segment); if (stateMap != null) { for (Map.Entry entry : stateMap.entrySet()) { + String serverInstanceId = entry.getKey(); String state = entry.getValue(); - if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { + if (isServerQueryable(serverInstanceId, context) && + (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { numEVReplicasUp++; } if (state.equals(SegmentStateModel.ERROR)) { @@ -355,9 +358,9 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp); - // Number of replicas in ideal state (including ERROR/OFFLINE states) - int numISReplicas = idealState.getInstanceStateMap(segment).entrySet().size(); - minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicas); + // Total number of replicas in ideal state (including ERROR/OFFLINE states) + int numISReplicasTotal = idealState.getInstanceStateMap(segment).entrySet().size(); + minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal); } if (maxISReplicasUp == Integer.MIN_VALUE) { @@ -426,6 +429,28 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } + private boolean isServerQueryable(String instanceId, Context context) { + if (context._serverQueryableMap.containsKey(instanceId)) { + return context._serverQueryableMap.get(instanceId); + } + InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); + // Instance not found in Helix cluster or not Server we assume it is down + if (instanceConfig == null || !instanceConfig.getInstanceName().startsWith("Server_")) { + LOGGER.warn("Instance {} not found in Helix cluster, or not a server instance", instanceId); + LOGGER.warn("will assume instance is down!"); + context._serverQueryableMap.put(instanceId, false); + return false; + } + + ZNRecord record = instanceConfig.getRecord(); + boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled")); + boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress")); + boolean queryable = !queriesDisabled && !shutdownInProgress; + + context._serverQueryableMap.put(instanceId, queryable); + return queryable; + } + private static String logSegments(List segments) { if (segments.size() <= MAX_SEGMENTS_TO_LOG) { return segments.toString(); @@ -473,5 +498,6 @@ public static final class Context { private final Set _processedTables = new HashSet<>(); private final Set _disabledTables = new HashSet<>(); private final Set _pausedTables = new HashSet<>(); + private final Map _serverQueryableMap = new HashMap<>(); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 4b2f18f7285e..f40337369c1c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -28,6 +28,7 @@ import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.lineage.LineageEntry; @@ -47,9 +48,6 @@ import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; -import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; -import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; @@ -109,6 +107,7 @@ public void offlineBasicTest() { externalView.setState("myTable_4", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); @@ -219,6 +218,7 @@ public void realtimeBasicTest() { externalView.setState(seg3, "pinot3", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -242,17 +242,10 @@ public void realtimeBasicTest() { } @Test - public void realtimeMutableSegmnetHasLessReplicaTest() { - InstanceAssignmentConfig instanceAssignmentConfig = - new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null, - new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false); - + public void realtimeMutableSegmentHasLessReplicaTest() { TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) - .setInstanceAssignmentConfigMap( - Map.of("CONSUMING", instanceAssignmentConfig) - ) .build(); String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); @@ -291,6 +284,7 @@ public void realtimeMutableSegmnetHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -314,17 +308,96 @@ public void realtimeMutableSegmnetHasLessReplicaTest() { } @Test - public void realtimeImmutableSegmnetHasLessReplicaTest() { - InstanceAssignmentConfig instanceAssignmentConfig = - new InstanceAssignmentConfig(new InstanceTagPoolConfig("DefaultTenant", true, 0, null), null, - new InstanceReplicaGroupPartitionConfig(true, 4, 4, 0, 0, 0, false, null), null, false); + public void realtimeServerNotQueryableTest() { + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) + .build(); + + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "Server_pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "Server_pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "Server_pinot3", "ONLINE"); + + idealState.setPartitionState(seg2, "Server_pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "Server_pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "Server_pinot3", "ONLINE"); + + idealState.setPartitionState(seg3, "Server_pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "Server_pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "Server_pinot3", "CONSUMING"); + idealState.setPartitionState(seg3, "Server_pinot4", "OFFLINE"); + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "Server_pinot1", "ONLINE"); + externalView.setState(seg1, "Server_pinot2", "ONLINE"); + externalView.setState(seg1, "Server_pinot3", "ONLINE"); + + externalView.setState(seg2, "Server_pinot1", "CONSUMING"); + externalView.setState(seg2, "Server_pinot2", "ONLINE"); + externalView.setState(seg2, "Server_pinot3", "CONSUMING"); + externalView.setState(seg2, "Server_pinot4", "CONSUMING"); + + externalView.setState(seg3, "Server_pinot1", "CONSUMING"); + externalView.setState(seg3, "Server_pinot2", "CONSUMING"); + externalView.setState(seg3, "Server_pinot3", "CONSUMING"); + externalView.setState(seg3, "Server_pinot4", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig("Server_pinot1")) + .thenReturn(newQueryDisabledInstanceConfig("Server_pinot1")); + when(resourceManager.getHelixInstanceConfig("Server_pinot2")) + .thenReturn(newShutdownInProgress("Server_pinot2")); + when(resourceManager.getHelixInstanceConfig("Server_pinot3")) + .thenReturn(new InstanceConfig("Server_pinot3")); + when(resourceManager.getHelixInstanceConfig("Server_pinot4")) + .thenReturn(new InstanceConfig("Server_pinot4")); + + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 1, 25, 0, 100, 3, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + + private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) { + ZNRecord znRecord = new ZNRecord(instanceName); + znRecord.setSimpleField("queriesDisabled", "true"); + return new InstanceConfig(znRecord); + } + + private InstanceConfig newShutdownInProgress(String instanceName) { + ZNRecord znRecord = new ZNRecord(instanceName); + znRecord.setSimpleField("shutdownInProgress", "true"); + return new InstanceConfig(znRecord); + } + + @Test + public void realtimeImmutableSegmentHasLessReplicaTest() { TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()) - .setInstanceAssignmentConfigMap( - Map.of("CONSUMING", instanceAssignmentConfig) - ) .build(); String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); @@ -363,6 +436,7 @@ public void realtimeImmutableSegmnetHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -429,6 +503,7 @@ public void missingEVPartitionTest() { externalView.setState("myTable_1", "pinot2", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -519,6 +594,7 @@ public void missingEVPartitionPushTest() { externalView.setState("myTable_2", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -661,6 +737,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() { } PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); From 28df5a877f3d33d8fcb0c8ec92f80ccdbdbbb4f0 Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Mon, 25 Nov 2024 17:17:31 -0800 Subject: [PATCH 03/12] fix lint --- .../pinot/controller/helix/SegmentStatusChecker.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 1ae54bff1c2f..d73a2b7e7fb0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -341,8 +341,8 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon for (Map.Entry entry : stateMap.entrySet()) { String serverInstanceId = entry.getKey(); String state = entry.getValue(); - if (isServerQueryable(serverInstanceId, context) && - (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { + if (isServerQueryable(serverInstanceId, context) + && (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { numEVReplicasUp++; } if (state.equals(SegmentStateModel.ERROR)) { @@ -407,7 +407,8 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicasUp); - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, minEVReplicasUpPercent); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, + minEVReplicasUpPercent); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, numErrorSegments); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, From d77cc4e5dd73c1b0e6b7be0c3f0d7b02623e4426 Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Mon, 25 Nov 2024 17:42:39 -0800 Subject: [PATCH 04/12] fix typo in comments --- .../org/apache/pinot/controller/helix/SegmentStatusChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index d73a2b7e7fb0..1685a706069d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -371,7 +371,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } - // Do not allow minEVReplicasUp to be larger than maxISReplicas + // Do not allow minEVReplicasUp to be larger than maxISReplicasUp minEVReplicasUp = Math.min(minEVReplicasUp, maxISReplicasUp); int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size(); From bf098d3cd535ebdbf6b121eae6e7adb172225e7a Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Mon, 25 Nov 2024 19:39:24 -0800 Subject: [PATCH 05/12] pre build a queryable server set to reduce ZK acess --- .../helix/SegmentStatusChecker.java | 37 ++++------ .../helix/core/PinotHelixResourceManager.java | 5 ++ .../helix/SegmentStatusCheckerTest.java | 74 ++++++++++++++----- 3 files changed, 75 insertions(+), 41 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 1685a706069d..d03433cb733d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -110,6 +110,17 @@ protected Context preprocess(Properties periodicTaskProperties) { context._logDisabledTables = true; _lastDisabledTableLogTimestamp = now; } + + // Read ZK once to build a set of queryable server instances + for (InstanceConfig instanceConfig : _pinotHelixResourceManager.getAllServerInstanceConfigs()) { + ZNRecord record = instanceConfig.getRecord(); + boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled")); + boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress")); + if (!queriesDisabled && !shutdownInProgress) { + context._queryableServers.add(instanceConfig.getInstanceName()); + } + } + return context; } @@ -341,7 +352,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon for (Map.Entry entry : stateMap.entrySet()) { String serverInstanceId = entry.getKey(); String state = entry.getValue(); - if (isServerQueryable(serverInstanceId, context) + if (context._queryableServers.contains(serverInstanceId) && (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { numEVReplicasUp++; } @@ -430,28 +441,6 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } - private boolean isServerQueryable(String instanceId, Context context) { - if (context._serverQueryableMap.containsKey(instanceId)) { - return context._serverQueryableMap.get(instanceId); - } - InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); - // Instance not found in Helix cluster or not Server we assume it is down - if (instanceConfig == null || !instanceConfig.getInstanceName().startsWith("Server_")) { - LOGGER.warn("Instance {} not found in Helix cluster, or not a server instance", instanceId); - LOGGER.warn("will assume instance is down!"); - context._serverQueryableMap.put(instanceId, false); - return false; - } - - ZNRecord record = instanceConfig.getRecord(); - boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled")); - boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress")); - boolean queryable = !queriesDisabled && !shutdownInProgress; - - context._serverQueryableMap.put(instanceId, queryable); - return queryable; - } - private static String logSegments(List segments) { if (segments.size() <= MAX_SEGMENTS_TO_LOG) { return segments.toString(); @@ -499,6 +488,6 @@ public static final class Context { private final Set _processedTables = new HashSet<>(); private final Set _disabledTables = new HashSet<>(); private final Set _pausedTables = new HashSet<>(); - private final Map _serverQueryableMap = new HashMap<>(); + private final Set _queryableServers = new HashSet<>(); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index e7affa4287d6..f009cbddc9ae 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -515,6 +515,11 @@ public List getAllControllerInstanceConfigs() { .filter(instance -> InstanceTypeUtils.isController(instance.getId())).collect(Collectors.toList()); } + public List getAllServerInstanceConfigs() { + return HelixHelper.getInstanceConfigs(_helixZkManager).stream() + .filter(instance -> InstanceTypeUtils.isServer(instance.getId())).collect(Collectors.toList()); + } + public List getAllMinionInstanceConfigs() { return HelixHelper.getInstanceConfigs(_helixZkManager).stream() .filter(instance -> InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index f40337369c1c..1eacd2828016 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -107,7 +107,13 @@ public void offlineBasicTest() { externalView.setState("myTable_4", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); @@ -218,7 +224,13 @@ public void realtimeBasicTest() { externalView.setState(seg3, "pinot3", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -284,7 +296,13 @@ public void realtimeMutableSegmentHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -350,15 +368,13 @@ public void realtimeServerNotQueryableTest() { externalView.setState(seg3, "Server_pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig("Server_pinot1")) - .thenReturn(newQueryDisabledInstanceConfig("Server_pinot1")); - when(resourceManager.getHelixInstanceConfig("Server_pinot2")) - .thenReturn(newShutdownInProgress("Server_pinot2")); - when(resourceManager.getHelixInstanceConfig("Server_pinot3")) - .thenReturn(new InstanceConfig("Server_pinot3")); - when(resourceManager.getHelixInstanceConfig("Server_pinot4")) - .thenReturn(new InstanceConfig("Server_pinot4")); - + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + newQueryDisabledInstanceConfig("Server_pinot1"), + newShutdownInProgressInstanceConfig("Server_pinot2"), + new InstanceConfig("Server_pinot3"), + new InstanceConfig("Server_pinot4")) + ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -387,7 +403,7 @@ private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) { return new InstanceConfig(znRecord); } - private InstanceConfig newShutdownInProgress(String instanceName) { + private InstanceConfig newShutdownInProgressInstanceConfig(String instanceName) { ZNRecord znRecord = new ZNRecord(instanceName); znRecord.setSimpleField("shutdownInProgress", "true"); return new InstanceConfig(znRecord); @@ -436,7 +452,13 @@ public void realtimeImmutableSegmentHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -503,7 +525,13 @@ public void missingEVPartitionTest() { externalView.setState("myTable_1", "pinot2", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -594,7 +622,13 @@ public void missingEVPartitionPushTest() { externalView.setState("myTable_2", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -737,7 +771,13 @@ public void lessThanOnePercentSegmentsUnavailableTest() { } PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getHelixInstanceConfig(any())).thenReturn(new InstanceConfig("Server_any")); + when(resourceManager.getAllServerInstanceConfigs()).thenReturn( + List.of( + new InstanceConfig("pinot1"), + new InstanceConfig("pinot2"), + new InstanceConfig("pinot3"), + new InstanceConfig("pinot4")) + ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); From d5d5973f370d3bc60da99641bfe90c02dab4a623 Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Mon, 25 Nov 2024 23:14:17 -0800 Subject: [PATCH 06/12] check if helix enabled --- .../helix/SegmentStatusChecker.java | 9 ++- .../helix/SegmentStatusCheckerTest.java | 72 ++++++++++--------- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index d03433cb733d..41a39efb7baa 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -53,6 +53,7 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -114,9 +115,11 @@ protected Context preprocess(Properties periodicTaskProperties) { // Read ZK once to build a set of queryable server instances for (InstanceConfig instanceConfig : _pinotHelixResourceManager.getAllServerInstanceConfigs()) { ZNRecord record = instanceConfig.getRecord(); - boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled")); - boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress")); - if (!queriesDisabled && !shutdownInProgress) { + boolean helixEnabled = record.getBooleanField( + InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false); + boolean queriesDisabled = record.getBooleanField(Helix.QUERIES_DISABLED, false); + boolean shutdownInProgress = record.getBooleanField(Helix.IS_SHUTDOWN_IN_PROGRESS, false); + if (helixEnabled && !queriesDisabled && !shutdownInProgress) { context._queryableServers.add(instanceConfig.getInstanceName()); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 1eacd2828016..a6c3df63be0d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -109,10 +109,10 @@ public void offlineBasicTest() { PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); when(resourceManager.getAllServerInstanceConfigs()).thenReturn( List.of( - new InstanceConfig("pinot1"), - new InstanceConfig("pinot2"), - new InstanceConfig("pinot3"), - new InstanceConfig("pinot4")) + newQuerableInstanceConfig("pinot1"), + newQuerableInstanceConfig("pinot2"), + newQuerableInstanceConfig("pinot3") + ) ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); @@ -226,10 +226,9 @@ public void realtimeBasicTest() { PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); when(resourceManager.getAllServerInstanceConfigs()).thenReturn( List.of( - new InstanceConfig("pinot1"), - new InstanceConfig("pinot2"), - new InstanceConfig("pinot3"), - new InstanceConfig("pinot4")) + newQuerableInstanceConfig("pinot1"), + newQuerableInstanceConfig("pinot2"), + newQuerableInstanceConfig("pinot3")) ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); @@ -298,10 +297,11 @@ public void realtimeMutableSegmentHasLessReplicaTest() { PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); when(resourceManager.getAllServerInstanceConfigs()).thenReturn( List.of( - new InstanceConfig("pinot1"), - new InstanceConfig("pinot2"), - new InstanceConfig("pinot3"), - new InstanceConfig("pinot4")) + newQuerableInstanceConfig("pinot1"), + newQuerableInstanceConfig("pinot2"), + newQuerableInstanceConfig("pinot3"), + newQuerableInstanceConfig("pinot4") + ) ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); @@ -372,8 +372,9 @@ public void realtimeServerNotQueryableTest() { List.of( newQueryDisabledInstanceConfig("Server_pinot1"), newShutdownInProgressInstanceConfig("Server_pinot2"), - new InstanceConfig("Server_pinot3"), - new InstanceConfig("Server_pinot4")) + newQuerableInstanceConfig("Server_pinot3"), + newQuerableInstanceConfig("Server_pinot4") + ) ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); @@ -399,13 +400,21 @@ public void realtimeServerNotQueryableTest() { private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) { ZNRecord znRecord = new ZNRecord(instanceName); - znRecord.setSimpleField("queriesDisabled", "true"); + znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true); + znRecord.setBooleanField(CommonConstants.Helix.QUERIES_DISABLED, true); return new InstanceConfig(znRecord); } private InstanceConfig newShutdownInProgressInstanceConfig(String instanceName) { ZNRecord znRecord = new ZNRecord(instanceName); - znRecord.setSimpleField("shutdownInProgress", "true"); + znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true); + znRecord.setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, true); + return new InstanceConfig(znRecord); + } + + private InstanceConfig newQuerableInstanceConfig(String instanceName) { + ZNRecord znRecord = new ZNRecord(instanceName); + znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true); return new InstanceConfig(znRecord); } @@ -454,10 +463,11 @@ public void realtimeImmutableSegmentHasLessReplicaTest() { PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); when(resourceManager.getAllServerInstanceConfigs()).thenReturn( List.of( - new InstanceConfig("pinot1"), - new InstanceConfig("pinot2"), - new InstanceConfig("pinot3"), - new InstanceConfig("pinot4")) + newQuerableInstanceConfig("pinot1"), + newQuerableInstanceConfig("pinot2"), + newQuerableInstanceConfig("pinot3"), + newQuerableInstanceConfig("pinot4") + ) ); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); @@ -527,10 +537,10 @@ public void missingEVPartitionTest() { PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); when(resourceManager.getAllServerInstanceConfigs()).thenReturn( List.of( - new InstanceConfig("pinot1"), - new InstanceConfig("pinot2"), - new InstanceConfig("pinot3"), - new InstanceConfig("pinot4")) + newQuerableInstanceConfig("pinot1"), + newQuerableInstanceConfig("pinot2"), + newQuerableInstanceConfig("pinot3") + ) ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); @@ -624,10 +634,9 @@ public void missingEVPartitionPushTest() { PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); when(resourceManager.getAllServerInstanceConfigs()).thenReturn( List.of( - new InstanceConfig("pinot1"), - new InstanceConfig("pinot2"), - new InstanceConfig("pinot3"), - new InstanceConfig("pinot4")) + newQuerableInstanceConfig("pinot1"), + newQuerableInstanceConfig("pinot2") + ) ); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); @@ -772,12 +781,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() { PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - new InstanceConfig("pinot1"), - new InstanceConfig("pinot2"), - new InstanceConfig("pinot3"), - new InstanceConfig("pinot4")) - ); + List.of(newQuerableInstanceConfig("pinot1"))); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); From 7ef229c45289442932e18e734e8a47d6cc560641 Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Tue, 26 Nov 2024 15:12:35 -0800 Subject: [PATCH 07/12] refactor code by adding ServerInfoCache --- .../helix/SegmentStatusChecker.java | 33 ++--- .../helix/core/PinotHelixResourceManager.java | 5 - .../controller/util/ServerInfoCache.java | 131 ++++++++++++++++++ .../helix/SegmentStatusCheckerTest.java | 67 ++------- 4 files changed, 161 insertions(+), 75 deletions(-) create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoCache.java diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 41a39efb7baa..2326ed69bed4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -29,7 +29,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; @@ -48,12 +47,13 @@ import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; +import org.apache.pinot.controller.util.ServerInfoCache; +import org.apache.pinot.controller.util.ServerInfoCache.ServerInfo; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -82,6 +82,8 @@ public class SegmentStatusChecker extends ControllerPeriodicTask entry : stateMap.entrySet()) { String serverInstanceId = entry.getKey(); String state = entry.getValue(); - if (context._queryableServers.contains(serverInstanceId) - && (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { + if (isServerQueryable(serverInstanceId) + && (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { numEVReplicasUp++; } if (state.equals(SegmentStateModel.ERROR)) { @@ -444,6 +434,14 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } + private boolean isServerQueryable(String serverInstanceId) { + ServerInfo serverInfo = _serverInfoCache.getServerInfo(serverInstanceId); + return serverInfo != null + && serverInfo.isHelixEnabled() + && !serverInfo.isQueriesDisabled() + && !serverInfo.isShutdownInProgress(); + } + private static String logSegments(List segments) { if (segments.size() <= MAX_SEGMENTS_TO_LOG) { return segments.toString(); @@ -491,6 +489,5 @@ public static final class Context { private final Set _processedTables = new HashSet<>(); private final Set _disabledTables = new HashSet<>(); private final Set _pausedTables = new HashSet<>(); - private final Set _queryableServers = new HashSet<>(); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f009cbddc9ae..e7affa4287d6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -515,11 +515,6 @@ public List getAllControllerInstanceConfigs() { .filter(instance -> InstanceTypeUtils.isController(instance.getId())).collect(Collectors.toList()); } - public List getAllServerInstanceConfigs() { - return HelixHelper.getInstanceConfigs(_helixZkManager).stream() - .filter(instance -> InstanceTypeUtils.isServer(instance.getId())).collect(Collectors.toList()); - } - public List getAllMinionInstanceConfigs() { return HelixHelper.getInstanceConfigs(_helixZkManager).stream() .filter(instance -> InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoCache.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoCache.java new file mode 100644 index 000000000000..d3d1aa60af7b --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoCache.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; + + +/** + * This is a helper class maintaining a cache of server information. It is used to avoid repeated calls to Helix to + * get server information. This class is not thread safe. + */ +public class ServerInfoCache { + private PinotHelixResourceManager _pinotHelixResourceManager; + private Map _serverInfoMap; + + public ServerInfoCache(PinotHelixResourceManager pinotHelixResourceManager) { + _pinotHelixResourceManager = pinotHelixResourceManager; + _serverInfoMap = new HashMap<>(); + } + + public ServerInfo getServerInfo(String instanceId) { + return _serverInfoMap.computeIfAbsent(instanceId, this::getServerInfoOndemand); + } + + private ServerInfo getServerInfoOndemand(String instanceId) { + InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); + if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) { + return null; + } + List tags = instanceConfig.getTags(); + ZNRecord record = instanceConfig.getRecord(); + boolean helixEnabled = record.getBooleanField( + InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false); + boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false); + boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false); + return new ServerInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress); + } + + public static class ServerInfo { + private String _instanceName; + private List _tags; + private List _tables; + private boolean _helixEnabled; + private boolean _queriesDisabled; + private boolean _shutdownInProgress; + private ServerInfo(String instanceName, + List tags, + List tables, + boolean helixEnabled, + boolean queriesDisabled, + boolean shutdownInProgress) { + _instanceName = instanceName; + _tags = tags; + _tables = tables; + _helixEnabled = helixEnabled; + _queriesDisabled = queriesDisabled; + _shutdownInProgress = shutdownInProgress; + } + + public String getInstanceName() { + return _instanceName; + } + + public void setInstanceName(String instanceName) { + _instanceName = instanceName; + } + + public List getTags() { + return _tags; + } + + public void setTags(List tags) { + _tags = tags; + } + + public List getTables() { + return _tables; + } + + public void setTables(List tables) { + _tables = tables; + } + + public boolean isHelixEnabled() { + return _helixEnabled; + } + + public void setHelixEnabled(boolean helixEnabled) { + _helixEnabled = helixEnabled; + } + + public boolean isQueriesDisabled() { + return _queriesDisabled; + } + + public void setQueriesDisabled(boolean queriesDisabled) { + _queriesDisabled = queriesDisabled; + } + + public boolean isShutdownInProgress() { + return _shutdownInProgress; + } + + public void setShutdownInProgress(boolean shutdownInProgress) { + _shutdownInProgress = shutdownInProgress; + } + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index a6c3df63be0d..f41084f1a6ab 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -107,13 +107,7 @@ public void offlineBasicTest() { externalView.setState("myTable_4", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); @@ -224,12 +218,7 @@ public void realtimeBasicTest() { externalView.setState(seg3, "pinot3", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3")) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -295,14 +284,7 @@ public void realtimeMutableSegmentHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3"), - newQuerableInstanceConfig("pinot4") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -368,14 +350,14 @@ public void realtimeServerNotQueryableTest() { externalView.setState(seg3, "Server_pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQueryDisabledInstanceConfig("Server_pinot1"), - newShutdownInProgressInstanceConfig("Server_pinot2"), - newQuerableInstanceConfig("Server_pinot3"), - newQuerableInstanceConfig("Server_pinot4") - ) - ); + when(resourceManager.getHelixInstanceConfig("Server_pinot1")). + thenReturn(newQueryDisabledInstanceConfig("Server_pinot1")); + when(resourceManager.getHelixInstanceConfig("Server_pinot2")). + thenReturn(newShutdownInProgressInstanceConfig("Server_pinot2")); + when(resourceManager.getHelixInstanceConfig("Server_pinot3")). + thenReturn(newQuerableInstanceConfig("Server_pinot3")); + when(resourceManager.getHelixInstanceConfig("Server_pinot4")). + thenReturn(newQuerableInstanceConfig("Server_pinot4")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -461,14 +443,7 @@ public void realtimeImmutableSegmentHasLessReplicaTest() { externalView.setState(seg3, "pinot4", "OFFLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3"), - newQuerableInstanceConfig("pinot4") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); @@ -535,13 +510,7 @@ public void missingEVPartitionTest() { externalView.setState("myTable_1", "pinot2", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2"), - newQuerableInstanceConfig("pinot3") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -632,12 +601,7 @@ public void missingEVPartitionPushTest() { externalView.setState("myTable_2", "pinot1", "ONLINE"); PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of( - newQuerableInstanceConfig("pinot1"), - newQuerableInstanceConfig("pinot2") - ) - ); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); @@ -780,8 +744,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() { } PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); - when(resourceManager.getAllServerInstanceConfigs()).thenReturn( - List.of(newQuerableInstanceConfig("pinot1"))); + when(resourceManager.getHelixInstanceConfig(any())).thenReturn(newQuerableInstanceConfig("any")); when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); From eec01ccf69ed6000ee74896044be7a0995cbf0f2 Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Tue, 26 Nov 2024 15:42:06 -0800 Subject: [PATCH 08/12] renaming --- .../controller/helix/SegmentStatusChecker.java | 10 +++++----- ...ServerInfoCache.java => ServerInfoFetcher.java} | 14 +++++++------- 2 files changed, 12 insertions(+), 12 deletions(-) rename pinot-controller/src/main/java/org/apache/pinot/controller/util/{ServerInfoCache.java => ServerInfoFetcher.java} (89%) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 2326ed69bed4..92e86615a916 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -47,8 +47,8 @@ import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; -import org.apache.pinot.controller.util.ServerInfoCache; -import org.apache.pinot.controller.util.ServerInfoCache.ServerInfo; +import org.apache.pinot.controller.util.ServerInfoFetcher; +import org.apache.pinot.controller.util.ServerInfoFetcher.ServerInfo; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -82,7 +82,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask _serverInfoMap; + private Map _serverInfoCache; - public ServerInfoCache(PinotHelixResourceManager pinotHelixResourceManager) { + public ServerInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) { _pinotHelixResourceManager = pinotHelixResourceManager; - _serverInfoMap = new HashMap<>(); + _serverInfoCache = new HashMap<>(); } public ServerInfo getServerInfo(String instanceId) { - return _serverInfoMap.computeIfAbsent(instanceId, this::getServerInfoOndemand); + return _serverInfoCache.computeIfAbsent(instanceId, this::getServerInfoOndemand); } private ServerInfo getServerInfoOndemand(String instanceId) { From 57e8bdca26b85082cf65acc915071e745840f39c Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Tue, 26 Nov 2024 19:54:56 -0800 Subject: [PATCH 09/12] fix integraiton test failure --- .../apache/pinot/controller/helix/SegmentStatusChecker.java | 5 ++++- .../java/org/apache/pinot/integration/tests/ClusterTest.java | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 92e86615a916..612daec16d4e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -359,11 +359,14 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon offlineSegments.add(segment); } else if (numEVReplicasUp < numISReplicasUp) { partialOnlineSegments.add(segment); + } else { + // Do not allow numEVReplicasUp to be larger than numISReplicasUp + numEVReplicasUp = numISReplicasUp; } minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp); // Total number of replicas in ideal state (including ERROR/OFFLINE states) - int numISReplicasTotal = idealState.getInstanceStateMap(segment).entrySet().size(); + int numISReplicasTotal = Math.max(idealState.getInstanceStateMap(segment).entrySet().size(), 1); minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal); } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 1338e9f529d3..e0f7a6f74f14 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -259,7 +259,9 @@ protected void startServers(int numServers) throws Exception { FileUtils.deleteQuietly(new File(TEMP_SERVER_DIR)); for (int i = 0; i < numServers; i++) { - _serverStarters.add(startOneServer(i)); + BaseServerStarter serverStarter = startOneServer(i); + _serverStarters.add(serverStarter); + _helixAdmin.enableInstance(getHelixClusterName(), serverStarter.getInstanceId(), true); } assertEquals(System.getProperty("user.timezone"), "UTC"); } From bbba882ad11684296748afe92d5837d7c187d54c Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Wed, 4 Dec 2024 13:05:06 -0800 Subject: [PATCH 10/12] address comments --- .../helix/SegmentStatusChecker.java | 21 ++++++++----------- ...tcher.java => ServerQueryInfoFetcher.java} | 20 +++++++++--------- 2 files changed, 19 insertions(+), 22 deletions(-) rename pinot-controller/src/main/java/org/apache/pinot/controller/util/{ServerInfoFetcher.java => ServerQueryInfoFetcher.java} (85%) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 612daec16d4e..5ea13507d9a2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -47,8 +47,8 @@ import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; -import org.apache.pinot.controller.util.ServerInfoFetcher; -import org.apache.pinot.controller.util.ServerInfoFetcher.ServerInfo; +import org.apache.pinot.controller.util.ServerQueryInfoFetcher; +import org.apache.pinot.controller.util.ServerQueryInfoFetcher.ServerQueryInfo; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -82,8 +82,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask entry : stateMap.entrySet()) { String serverInstanceId = entry.getKey(); - String state = entry.getValue(); - if (isServerQueryable(serverInstanceId) - && (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING))) { + String segmentState = entry.getValue(); + if (isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId)) + && (segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING))) { numEVReplicasUp++; } - if (state.equals(SegmentStateModel.ERROR)) { + if (segmentState.equals(SegmentStateModel.ERROR)) { errorSegments.add(Pair.of(segment, entry.getKey())); } } @@ -437,8 +435,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } - private boolean isServerQueryable(String serverInstanceId) { - ServerInfo serverInfo = _serverInfoFetcher.getServerInfo(serverInstanceId); + private boolean isServerQueryable(ServerQueryInfo serverInfo) { return serverInfo != null && serverInfo.isHelixEnabled() && !serverInfo.isQueriesDisabled() diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoFetcher.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java similarity index 85% rename from pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoFetcher.java rename to pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java index 7b094e6bf91b..819e64087a37 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerInfoFetcher.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java @@ -32,20 +32,20 @@ * This is a helper class that fetch server information from Helix/ZK. It caches the server information to avoid * repeated ZK access. This class is NOT thread-safe. */ -public class ServerInfoFetcher { +public class ServerQueryInfoFetcher { private PinotHelixResourceManager _pinotHelixResourceManager; - private Map _serverInfoCache; + private Map _cache; - public ServerInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) { + public ServerQueryInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) { _pinotHelixResourceManager = pinotHelixResourceManager; - _serverInfoCache = new HashMap<>(); + _cache = new HashMap<>(); } - public ServerInfo getServerInfo(String instanceId) { - return _serverInfoCache.computeIfAbsent(instanceId, this::getServerInfoOndemand); + public ServerQueryInfo getServerQueryInfo(String instanceId) { + return _cache.computeIfAbsent(instanceId, this::getServerQueryInfoOndemand); } - private ServerInfo getServerInfoOndemand(String instanceId) { + private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) { InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) { return null; @@ -56,17 +56,17 @@ private ServerInfo getServerInfoOndemand(String instanceId) { InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false); boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false); boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false); - return new ServerInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress); + return new ServerQueryInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress); } - public static class ServerInfo { + public static class ServerQueryInfo { private String _instanceName; private List _tags; private List _tables; private boolean _helixEnabled; private boolean _queriesDisabled; private boolean _shutdownInProgress; - private ServerInfo(String instanceName, + private ServerQueryInfo(String instanceName, List tags, List tables, boolean helixEnabled, From d49e8ab8682471aed28ffb72b65077d46f7bd87e Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Tue, 7 Jan 2025 10:48:41 -0800 Subject: [PATCH 11/12] address comments --- .../helix/SegmentStatusChecker.java | 4 +- .../util/ServerQueryInfoFetcher.java | 68 +++++-------------- 2 files changed, 18 insertions(+), 54 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 5ea13507d9a2..34be96e9fa02 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -343,8 +343,8 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon for (Map.Entry entry : stateMap.entrySet()) { String serverInstanceId = entry.getKey(); String segmentState = entry.getValue(); - if (isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId)) - && (segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING))) { + if ((segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING)) + && isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))) { numEVReplicasUp++; } if (segmentState.equals(SegmentStateModel.ERROR)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java index 819e64087a37..2ac53ae508e3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerQueryInfoFetcher.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -33,18 +34,20 @@ * repeated ZK access. This class is NOT thread-safe. */ public class ServerQueryInfoFetcher { - private PinotHelixResourceManager _pinotHelixResourceManager; - private Map _cache; + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final Map _cache; public ServerQueryInfoFetcher(PinotHelixResourceManager pinotHelixResourceManager) { _pinotHelixResourceManager = pinotHelixResourceManager; _cache = new HashMap<>(); } + @Nullable public ServerQueryInfo getServerQueryInfo(String instanceId) { return _cache.computeIfAbsent(instanceId, this::getServerQueryInfoOndemand); } + @Nullable private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) { InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); if (instanceConfig == null || !InstanceTypeUtils.isServer(instanceId)) { @@ -52,26 +55,23 @@ private ServerQueryInfo getServerQueryInfoOndemand(String instanceId) { } List tags = instanceConfig.getTags(); ZNRecord record = instanceConfig.getRecord(); - boolean helixEnabled = record.getBooleanField( - InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false); + boolean helixEnabled = instanceConfig.getInstanceEnabled(); boolean queriesDisabled = record.getBooleanField(CommonConstants.Helix.QUERIES_DISABLED, false); boolean shutdownInProgress = record.getBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false); + return new ServerQueryInfo(instanceId, tags, null, helixEnabled, queriesDisabled, shutdownInProgress); } public static class ServerQueryInfo { - private String _instanceName; - private List _tags; - private List _tables; - private boolean _helixEnabled; - private boolean _queriesDisabled; - private boolean _shutdownInProgress; - private ServerQueryInfo(String instanceName, - List tags, - List tables, - boolean helixEnabled, - boolean queriesDisabled, - boolean shutdownInProgress) { + private final String _instanceName; + private final List _tags; + private final List _tables; + private final boolean _helixEnabled; + private final boolean _queriesDisabled; + private final boolean _shutdownInProgress; + + private ServerQueryInfo(String instanceName, List tags, List tables, boolean helixEnabled, + boolean queriesDisabled, boolean shutdownInProgress) { _instanceName = instanceName; _tags = tags; _tables = tables; @@ -80,52 +80,16 @@ private ServerQueryInfo(String instanceName, _shutdownInProgress = shutdownInProgress; } - public String getInstanceName() { - return _instanceName; - } - - public void setInstanceName(String instanceName) { - _instanceName = instanceName; - } - - public List getTags() { - return _tags; - } - - public void setTags(List tags) { - _tags = tags; - } - - public List getTables() { - return _tables; - } - - public void setTables(List tables) { - _tables = tables; - } - public boolean isHelixEnabled() { return _helixEnabled; } - public void setHelixEnabled(boolean helixEnabled) { - _helixEnabled = helixEnabled; - } - public boolean isQueriesDisabled() { return _queriesDisabled; } - public void setQueriesDisabled(boolean queriesDisabled) { - _queriesDisabled = queriesDisabled; - } - public boolean isShutdownInProgress() { return _shutdownInProgress; } - - public void setShutdownInProgress(boolean shutdownInProgress) { - _shutdownInProgress = shutdownInProgress; - } } } From 9007e0a9ca136caef08c44407cc498f5a9c3de56 Mon Sep 17 00:00:00 2001 From: Mingqiang Liang Date: Thu, 9 Jan 2025 21:58:37 -0800 Subject: [PATCH 12/12] don't explicitly enable an instance in testing --- .../java/org/apache/pinot/integration/tests/ClusterTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index e0f7a6f74f14..1338e9f529d3 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -259,9 +259,7 @@ protected void startServers(int numServers) throws Exception { FileUtils.deleteQuietly(new File(TEMP_SERVER_DIR)); for (int i = 0; i < numServers; i++) { - BaseServerStarter serverStarter = startOneServer(i); - _serverStarters.add(serverStarter); - _helixAdmin.enableInstance(getHelixClusterName(), serverStarter.getInstanceId(), true); + _serverStarters.add(startOneServer(i)); } assertEquals(System.getProperty("user.timezone"), "UTC"); }