Skip to content

Commit

Permalink
minor refactoring to make code reusable
Browse files Browse the repository at this point in the history
  • Loading branch information
mqliang committed Nov 26, 2024
1 parent bf098d3 commit 4c5224a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
Expand All @@ -53,6 +54,7 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
Expand Down Expand Up @@ -110,17 +112,7 @@ protected Context preprocess(Properties periodicTaskProperties) {
context._logDisabledTables = true;
_lastDisabledTableLogTimestamp = now;
}

// Read ZK once to build a set of queryable server instances
for (InstanceConfig instanceConfig : _pinotHelixResourceManager.getAllServerInstanceConfigs()) {
ZNRecord record = instanceConfig.getRecord();
boolean queriesDisabled = Boolean.valueOf(record.getSimpleField("queriesDisabled"));
boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField("shutdownInProgress"));
if (!queriesDisabled && !shutdownInProgress) {
context._queryableServers.add(instanceConfig.getInstanceName());
}
}

context._queryableServers.addAll(_pinotHelixResourceManager.getAllQueryableServers());
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,21 @@ public List<InstanceConfig> getAllMinionInstanceConfigs() {
.filter(instance -> InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList());
}

public Set<String> getAllQueryableServers() {
return getAllServerInstanceConfigs()
.stream()
.filter(instanceConfig -> {
ZNRecord record = instanceConfig.getRecord();
boolean helixEnabled = Boolean.valueOf(record.getSimpleField(
InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name()));
boolean queriesDisabled = Boolean.valueOf(record.getSimpleField(Helix.QUERIES_DISABLED));
boolean shutdownInProgress = Boolean.valueOf(record.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS));
return helixEnabled && !queriesDisabled && !shutdownInProgress;
})
.map(InstanceConfig::getInstanceName)
.collect(Collectors.toSet());
}

/**
* Get all instances with the given tag
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
Expand Down Expand Up @@ -107,13 +108,7 @@ public void offlineBasicTest() {
externalView.setState("myTable_4", "pinot1", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllQueryableServers()).thenReturn(Set.of("pinot1", "pinot2", "pinot3"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -224,13 +219,7 @@ public void realtimeBasicTest() {
externalView.setState(seg3, "pinot3", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllQueryableServers()).thenReturn(Set.of("pinot1", "pinot2", "pinot3"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -296,13 +285,7 @@ public void realtimeMutableSegmentHasLessReplicaTest() {
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllQueryableServers()).thenReturn(Set.of("pinot1", "pinot2", "pinot3", "pinot4"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -368,13 +351,7 @@ public void realtimeServerNotQueryableTest() {
externalView.setState(seg3, "Server_pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
newQueryDisabledInstanceConfig("Server_pinot1"),
newShutdownInProgressInstanceConfig("Server_pinot2"),
new InstanceConfig("Server_pinot3"),
new InstanceConfig("Server_pinot4"))
);
when(resourceManager.getAllQueryableServers()).thenReturn(Set.of("Server_pinot3", "Server_pinot4"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand All @@ -399,13 +376,21 @@ public void realtimeServerNotQueryableTest() {

private InstanceConfig newQueryDisabledInstanceConfig(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setSimpleField("queriesDisabled", "true");
znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), "true");
znRecord.setSimpleField(CommonConstants.Helix.QUERIES_DISABLED, "true");
return new InstanceConfig(znRecord);
}

private InstanceConfig newShutdownInProgressInstanceConfig(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setSimpleField("shutdownInProgress", "true");
znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), "true");
znRecord.setSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, "true");
return new InstanceConfig(znRecord);
}

private InstanceConfig newQuerableInstanceConfig(String instanceName) {
ZNRecord znRecord = new ZNRecord(instanceName);
znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), "true");
return new InstanceConfig(znRecord);
}

Expand Down Expand Up @@ -452,13 +437,7 @@ public void realtimeImmutableSegmentHasLessReplicaTest() {
externalView.setState(seg3, "pinot4", "OFFLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllQueryableServers()).thenReturn(Set.of("pinot1", "pinot2", "pinot3", "pinot4"));
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down Expand Up @@ -525,13 +504,7 @@ public void missingEVPartitionTest() {
externalView.setState("myTable_1", "pinot2", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllQueryableServers()).thenReturn(Set.of("pinot1", "pinot2", "pinot3"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
Expand Down Expand Up @@ -622,13 +595,7 @@ public void missingEVPartitionPushTest() {
externalView.setState("myTable_2", "pinot1", "ONLINE");

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllQueryableServers()).thenReturn(Set.of("pinot1", "pinot2"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
Expand Down Expand Up @@ -771,13 +738,7 @@ public void lessThanOnePercentSegmentsUnavailableTest() {
}

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllServerInstanceConfigs()).thenReturn(
List.of(
new InstanceConfig("pinot1"),
new InstanceConfig("pinot2"),
new InstanceConfig("pinot3"),
new InstanceConfig("pinot4"))
);
when(resourceManager.getAllQueryableServers()).thenReturn(Set.of("pinot1"));
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
Expand Down

0 comments on commit 4c5224a

Please sign in to comment.