Skip to content

Commit

Permalink
Fix ConfigNode Partition Metric NPE bug apache#14144
Browse files Browse the repository at this point in the history
Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org>
  • Loading branch information
OneSizeFitsQuorum authored Nov 21, 2024
1 parent 43ed865 commit 4a76dfb
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private void invalidateSchemaCacheOfOldLeaders(
requestIndex.get(), dataNodeLocation);
// set req
final TConsensusGroupId consensusGroupId = entry.getKey();
final String database = getPartitionManager().getRegionStorageGroup(consensusGroupId);
final String database = getPartitionManager().getRegionDatabase(consensusGroupId);
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
requestIndex.incrementAndGet();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,8 +1194,8 @@ public GetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
* @param regionId regionId
* @return database name
*/
public String getRegionStorageGroup(TConsensusGroupId regionId) {
return partitionInfo.getRegionStorageGroup(regionId);
public String getRegionDatabase(TConsensusGroupId regionId) {
return partitionInfo.getRegionDatabase(regionId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ private static void bindDatabasePartitionMetricsWhenUpdate(
try {
return manager.getRegionGroupCount(database, TConsensusGroupType.SchemaRegion);
} catch (DatabaseNotExistsException e) {
LOGGER.warn("Error when counting SchemaRegionGroups in Database: {}", database, e);
return -1;
LOGGER.info("Error when counting SchemaRegionGroups in Database: {}", database, e);
return 0;
}
},
Tag.NAME.toString(),
Expand All @@ -361,8 +361,8 @@ private static void bindDatabasePartitionMetricsWhenUpdate(
try {
return manager.getRegionGroupCount(database, TConsensusGroupType.DataRegion);
} catch (DatabaseNotExistsException e) {
LOGGER.warn("Error when counting DataRegionGroups in Database: {}", database, e);
return -1;
LOGGER.info("Error when counting DataRegionGroups in Database: {}", database, e);
return 0;
}
},
Tag.NAME.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEven
.forEach(
(regionGroupId, pair) -> {
final String databaseName =
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
configManager.getPartitionManager().getRegionDatabase(regionGroupId);
// Pipe only collect user's data, filter metric database here.
// DatabaseName may be null for config region group
if (Objects.isNull(databaseName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,16 @@ public List<RegionMaintainTask> getRegionMaintainEntryList() {
}

/**
* Thread-safely pre-delete the specific StorageGroup.
* Thread-safely pre-delete the specific database.
*
* @param preDeleteDatabasePlan PreDeleteStorageGroupPlan
* @return {@link TSStatusCode#SUCCESS_STATUS}
*/
public TSStatus preDeleteDatabase(PreDeleteDatabasePlan preDeleteDatabasePlan) {
final PreDeleteDatabasePlan.PreDeleteType preDeleteType =
preDeleteDatabasePlan.getPreDeleteType();
final String storageGroup = preDeleteDatabasePlan.getStorageGroup();
DatabasePartitionTable databasePartitionTable = databasePartitionTables.get(storageGroup);
final String database = preDeleteDatabasePlan.getStorageGroup();
DatabasePartitionTable databasePartitionTable = databasePartitionTables.get(database);
if (databasePartitionTable == null) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
Expand All @@ -305,12 +305,12 @@ public boolean isDatabasePreDeleted(String database) {
}

/**
* Thread-safely delete StorageGroup.
* Thread-safely delete database.
*
* @param plan DeleteStorageGroupPlan
* @param plan DeleteDatabasePlan
*/
public void deleteDatabase(DeleteDatabasePlan plan) {
// Clean the StorageGroupTable cache
// Clean the databaseTable cache
databasePartitionTables.remove(plan.getName());
}

Expand All @@ -325,24 +325,24 @@ public DataSet getSchemaPartition(GetSchemaPartitionPlan plan) {
// TODO: Replace this map with new SchemaPartition
Map<String, SchemaPartitionTable> schemaPartition = new ConcurrentHashMap<>();

if (plan.getPartitionSlotsMap().size() == 0) {
if (plan.getPartitionSlotsMap().isEmpty()) {
// Return all SchemaPartitions when the queried PartitionSlots are empty
databasePartitionTables.forEach(
(storageGroup, databasePartitionTable) -> {
(database, databasePartitionTable) -> {
if (databasePartitionTable.isNotPreDeleted()) {
schemaPartition.put(storageGroup, new SchemaPartitionTable());
schemaPartition.put(database, new SchemaPartitionTable());

databasePartitionTable.getSchemaPartition(
new ArrayList<>(), schemaPartition.get(storageGroup));
new ArrayList<>(), schemaPartition.get(database));

if (schemaPartition.get(storageGroup).getSchemaPartitionMap().isEmpty()) {
if (schemaPartition.get(database).getSchemaPartitionMap().isEmpty()) {
// Remove empty Map
schemaPartition.remove(storageGroup);
schemaPartition.remove(database);
}
}
});
} else {
// Return the SchemaPartition for each StorageGroup
// Return the SchemaPartition for each database
plan.getPartitionSlotsMap()
.forEach(
(database, partitionSlots) -> {
Expand Down Expand Up @@ -506,16 +506,16 @@ public DataSet getSchemaNodeManagementPartition(List<String> matchedDatabases) {
matchedDatabases.stream()
.filter(this::isDatabaseExisted)
.forEach(
storageGroup -> {
schemaPartitionMap.put(storageGroup, new SchemaPartitionTable());
database -> {
schemaPartitionMap.put(database, new SchemaPartitionTable());

databasePartitionTables
.get(storageGroup)
.getSchemaPartition(new ArrayList<>(), schemaPartitionMap.get(storageGroup));
.get(database)
.getSchemaPartition(new ArrayList<>(), schemaPartitionMap.get(database));

if (schemaPartitionMap.get(storageGroup).getSchemaPartitionMap().isEmpty()) {
if (schemaPartitionMap.get(database).getSchemaPartitionMap().isEmpty()) {
// Remove empty Map
schemaPartitionMap.remove(storageGroup);
schemaPartitionMap.remove(database);
}
});

Expand All @@ -534,10 +534,10 @@ public DataSet getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) {
return regionResp;
}
TShowRegionReq showRegionReq = regionsInfoPlan.getShowRegionReq();
final List<String> storageGroups = showRegionReq != null ? showRegionReq.getDatabases() : null;
final List<String> databases = showRegionReq != null ? showRegionReq.getDatabases() : null;
databasePartitionTables.forEach(
(storageGroup, databasePartitionTable) -> {
if (storageGroups != null && !storageGroups.contains(storageGroup)) {
(database, databasePartitionTable) -> {
if (databases != null && !databases.contains(database)) {
return;
}
regionInfoList.addAll(databasePartitionTable.getRegionInfoList(regionsInfoPlan));
Expand Down Expand Up @@ -602,7 +602,7 @@ public TSStatus removeRegionLocation(RemoveRegionLocationPlan req) {
* @param regionId regionId
* @return database name
*/
public String getRegionStorageGroup(TConsensusGroupId regionId) {
public String getRegionDatabase(TConsensusGroupId regionId) {
Optional<DatabasePartitionTable> sgPartitionTableOptional =
databasePartitionTables.values().stream()
.filter(s -> s.containRegionGroup(regionId))
Expand All @@ -617,9 +617,9 @@ public String getRegionStorageGroup(TConsensusGroupId regionId) {
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots.
*
* @param partitionSlotsMap Map<StorageGroupName, List<TSeriesPartitionSlot>>
* @return Map<StorageGroupName, List<TSeriesPartitionSlot>>, SchemaPartitionSlots that is not
* assigned in partitionSlotsMap
* @param partitionSlotsMap Map<database, List<TSeriesPartitionSlot>>
* @return Map<database, List<TSeriesPartitionSlot>>, SchemaPartitionSlots that is not assigned in
* partitionSlotsMap
*/
public Map<String, List<TSeriesPartitionSlot>> filterUnassignedSchemaPartitionSlots(
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap) {
Expand All @@ -642,9 +642,9 @@ public Map<String, List<TSeriesPartitionSlot>> filterUnassignedSchemaPartitionSl
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots
*
* @param partitionSlotsMap Map<StorageGroupName, Map<TSeriesPartitionSlot, TTimeSlotList>>
* @return Map<StorageGroupName, Map<TSeriesPartitionSlot, TTimeSlotList>>, DataPartitionSlots
* that is not assigned in partitionSlotsMap
* @param partitionSlotsMap Map<database, Map<TSeriesPartitionSlot, TTimeSlotList>>
* @return Map<database, Map<TSeriesPartitionSlot, TTimeSlotList>>, DataPartitionSlots that is not
* assigned in partitionSlotsMap
*/
public Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> filterUnassignedDataPartitionSlots(
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap) {
Expand Down Expand Up @@ -806,8 +806,8 @@ public int countDataNodeScatterWidth(
*
* @param database DatabaseName
* @param type SchemaRegion or DataRegion
* @return Number of Regions currently owned by the specific StorageGroup
* @throws DatabaseNotExistsException When the specific StorageGroup doesn't exist
* @return Number of Regions currently owned by the specific database
* @throws DatabaseNotExistsException When the specific database doesn't exist
*/
public int getRegionGroupCount(String database, TConsensusGroupType type)
throws DatabaseNotExistsException {
Expand Down Expand Up @@ -865,7 +865,9 @@ public List<TConsensusGroupId> getAllRegionGroupIds(String database, TConsensusG
* @return The assigned SeriesPartitionSlots count
*/
public int getAssignedSeriesPartitionSlotsCount(String database) {
return databasePartitionTables.get(database).getAssignedSeriesPartitionSlotsCount();
return Optional.ofNullable(databasePartitionTables.get(database))
.map(DatabasePartitionTable::getAssignedSeriesPartitionSlotsCount)
.orElse(0);
}

/**
Expand All @@ -877,13 +879,15 @@ public int getAssignedSeriesPartitionSlotsCount(String database) {
* @return The assigned TimePartitionSlots count
*/
public long getAssignedTimePartitionSlotsCount(String database) {
return databasePartitionTables.get(database).getTimeSlotCount();
return Optional.ofNullable(databasePartitionTables.get(database))
.map(DatabasePartitionTable::getTimeSlotCount)
.orElse(0L);
}

/**
* Get the DataNodes who contain the specific StorageGroup's Schema or Data.
* Get the DataNodes who contain the specific database's Schema or Data.
*
* @param database The specific StorageGroup's name
* @param database The specific database's name
* @param type SchemaRegion or DataRegion
* @return Set {@literal <}TDataNodeLocation{@literal >}, the related DataNodes
*/
Expand All @@ -897,7 +901,7 @@ public Set<TDataNodeLocation> getDatabaseRelatedDataNodes(
*
* @param database DatabaseName
* @param type SchemaRegion or DataRegion
* @return The StorageGroup's Running or Available Regions that sorted by the number of allocated
* @return The database's Running or Available Regions that sorted by the number of allocated
* slots
*/
public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(
Expand Down Expand Up @@ -955,12 +959,12 @@ public boolean processTakeSnapshot(File snapshotDir) throws TException, IOExcept
// serialize nextRegionGroupId
ReadWriteIOUtils.write(nextRegionGroupId.get(), bufferedOutputStream);

// serialize StorageGroupPartitionTable
// serialize databasePartitionTable
ReadWriteIOUtils.write(databasePartitionTables.size(), bufferedOutputStream);
for (Map.Entry<String, DatabasePartitionTable> storageGroupPartitionTableEntry :
for (Map.Entry<String, DatabasePartitionTable> databasePartitionTableEntry :
databasePartitionTables.entrySet()) {
ReadWriteIOUtils.write(storageGroupPartitionTableEntry.getKey(), bufferedOutputStream);
storageGroupPartitionTableEntry.getValue().serialize(bufferedOutputStream, protocol);
ReadWriteIOUtils.write(databasePartitionTableEntry.getKey(), bufferedOutputStream);
databasePartitionTableEntry.getValue().serialize(bufferedOutputStream, protocol);
}

// serialize regionCleanList
Expand Down Expand Up @@ -1012,17 +1016,16 @@ public void processLoadSnapshot(final File snapshotDir) throws TException, IOExc
// start to restore
nextRegionGroupId.set(ReadWriteIOUtils.readInt(fileInputStream));

// restore StorageGroupPartitionTable
// restore databasePartitionTable
int length = ReadWriteIOUtils.readInt(fileInputStream);
for (int i = 0; i < length; i++) {
final String storageGroup = ReadWriteIOUtils.readString(fileInputStream);
if (storageGroup == null) {
throw new IOException("Failed to load snapshot because get null StorageGroup name");
final String database = ReadWriteIOUtils.readString(fileInputStream);
if (database == null) {
throw new IOException("Failed to load snapshot because get null database name");
}
final DatabasePartitionTable databasePartitionTable =
new DatabasePartitionTable(storageGroup);
final DatabasePartitionTable databasePartitionTable = new DatabasePartitionTable(database);
databasePartitionTable.deserialize(fileInputStream, protocol);
databasePartitionTables.put(storageGroup, databasePartitionTable);
databasePartitionTables.put(database, databasePartitionTable);
}

// restore deletedRegionSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocatio
currentPeerNodes = Collections.emptyList();
}

String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
String database = configManager.getPartitionManager().getRegionDatabase(regionId);
TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, database);

status =
(TSStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
final String databaseName =
env.getConfigManager().getPartitionManager().getRegionStorageGroup(regionGroupId);
env.getConfigManager().getPartitionManager().getRegionDatabase(regionGroupId);
final PipeTaskMeta currentPipeTaskMeta =
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
if (databaseName != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
final String databaseName =
env.getConfigManager()
.getPartitionManager()
.getRegionStorageGroup(regionGroupId);
env.getConfigManager().getPartitionManager().getRegionDatabase(regionGroupId);
if (databaseName != null
&& !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")) {
Expand Down

0 comments on commit 4a76dfb

Please sign in to comment.