Skip to content

Commit

Permalink
check if helix enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
mqliang committed Nov 26, 2024
1 parent bf098d3 commit d5d5973
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
Expand Down Expand Up @@ -114,9 +115,11 @@ protected Context preprocess(Properties periodicTaskProperties) {
// Read ZK once to build a set of queryable server instances
for (InstanceConfig instanceConfig : _pinotHelixResourceManager.getAllServerInstanceConfigs()) {
ZNRecord record = instanceConfig.getRecord();
boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled"));
boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress"));
if (!queriesDisabled && !shutdownInProgress) {
boolean helixEnabled = record.getBooleanField(
InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), false);
boolean queriesDisabled = record.getBooleanField(Helix.QUERIES_DISABLED, false);
boolean shutdownInProgress = record.getBooleanField(Helix.IS_SHUTDOWN_IN_PROGRESS, false);
if (helixEnabled && !queriesDisabled && !shutdownInProgress) {
context._queryableServers.add(instanceConfig.getInstanceName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ public void offlineBasicTest() {
PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3")
)
);
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
Expand Down Expand Up @@ -226,10 +226,9 @@ public void realtimeBasicTest() {
PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3"))
);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
Expand Down Expand Up @@ -298,10 +297,11 @@ public void realtimeMutableSegmentHasLessReplicaTest() {
PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3"),
newQuerableInstanceConfig("pinot4")
)
);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
Expand Down Expand Up @@ -372,8 +372,9 @@ public void realtimeServerNotQueryableTest() {
List.of(
newQueryDisabledInstanceConfig("Server_pinot1"),
newShutdownInProgressInstanceConfig("Server_pinot2"),
new InstanceConfig("Server_pinot3"),
new InstanceConfig("Server_pinot4"))
newQuerableInstanceConfig("Server_pinot3"),
newQuerableInstanceConfig("Server_pinot4")
)
);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
Expand All @@ -399,13 +400,21 @@ public void realtimeServerNotQueryableTest() {

private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setSimpleField("queriesDisabled", "true");
znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true);
znRecord.setBooleanField(CommonConstants.Helix.QUERIES_DISABLED, true);
return new InstanceConfig(znRecord);
}

private InstanceConfig newShutdownInProgressInstanceConfig(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setSimpleField("shutdownInProgress", "true");
znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true);
znRecord.setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, true);
return new InstanceConfig(znRecord);
}

private InstanceConfig newQuerableInstanceConfig(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setBooleanField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), true);
return new InstanceConfig(znRecord);
}

Expand Down Expand Up @@ -454,10 +463,11 @@ public void realtimeImmutableSegmentHasLessReplicaTest() {
PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3"),
newQuerableInstanceConfig("pinot4")
)
);
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
Expand Down Expand Up @@ -527,10 +537,10 @@ public void missingEVPartitionTest() {
PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2"),
newQuerableInstanceConfig("pinot3")
)
);
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -624,10 +634,9 @@ public void missingEVPartitionPushTest() {
PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
newQuerableInstanceConfig("pinot1"),
newQuerableInstanceConfig("pinot2")
)
);
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -772,12 +781,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() {

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
List.of(newQuerableInstanceConfig("pinot1")));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down

0 comments on commit d5d5973

Please sign in to comment.