diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 9422fc64a6eb..807c5835d6fd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -891,6 +891,31 @@ public String getTableReloadMetadata( } } + @GET + @Path("segments/{tableNameWithType}/isStale") + @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = Actions.Table.GET_METADATA) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Gets a list of segments that are stale from servers hosting the table", + notes = "Gets a list of segments that are stale from servers hosting the table") + public Map getStaleSegments( + @ApiParam(value = "Table name with type", required = true, example = "myTable_REALTIME") + @PathParam("tableNameWithType") String tableNameWithType, @Context HttpHeaders headers) { + tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); + LOGGER.info("Received a request to check for segments requiring a refresh from all servers hosting segments for " + + "table {}", tableNameWithType); + try { + TableMetadataReader tableMetadataReader = + new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager); + return tableMetadataReader.getStaleSegments(tableNameWithType, + _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + } catch (InvalidConfigException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Status.BAD_REQUEST); + } catch (IOException ioe) { + throw new ControllerApplicationException(LOGGER, "Error parsing Pinot server response: " + ioe.getMessage(), + Status.INTERNAL_SERVER_ERROR, ioe); + } + } + @GET @Path("segments/{tableName}/zkmetadata") @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java new file mode 100644 index 000000000000..eead74dd8f41 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import org.apache.pinot.segment.local.data.manager.StaleSegment; + + +public class TableStaleSegmentResponse { + private final List _staleSegmentList; + private final boolean _isValidResponse; + private final String _errorMessage; + + @JsonCreator + public TableStaleSegmentResponse(@JsonProperty("staleSegmentList") List staleSegmentList, + @JsonProperty("validResponse") boolean isValidResponse, + @JsonProperty("errorMessage") String errorMessage) { + _staleSegmentList = staleSegmentList; + _isValidResponse = isValidResponse; + _errorMessage = errorMessage; + } + + public TableStaleSegmentResponse(List staleSegmentList) { + _staleSegmentList = staleSegmentList; + _isValidResponse = true; + _errorMessage = null; + } + + public TableStaleSegmentResponse(String errorMessage) { + _staleSegmentList = null; + _isValidResponse = false; + _errorMessage = errorMessage; + } + + @JsonProperty + public List getStaleSegmentList() { + return _staleSegmentList; + } + + @JsonProperty + public boolean isValidResponse() { + return _isValidResponse; + } + + @JsonProperty + public String getErrorMessage() { + return _errorMessage; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index 781140a978ba..8dde7f08fee0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -47,6 +47,8 @@ import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.common.utils.RoaringBitmapUtils; +import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse; +import org.apache.pinot.segment.local.data.manager.StaleSegment; import org.apache.pinot.spi.utils.JsonUtils; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; @@ -397,6 +399,34 @@ public ValidDocIdsBitmapResponse getValidDocIdsBitmapFromServer(String tableName return response; } + public Map getStaleSegmentsFromServer( + String tableNameWithType, Set serverInstances, BiMap endpoints, int timeoutMs) { + LOGGER.debug("Getting list of segments for refresh from servers for table {}.", tableNameWithType); + List serverURLs = new ArrayList<>(); + for (String serverInstance : serverInstances) { + serverURLs.add(generateStaleSegmentsServerURL(tableNameWithType, endpoints.get(serverInstance))); + } + BiMap endpointsToServers = endpoints.inverse(); + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers); + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiGetRequest(serverURLs, tableNameWithType, false, timeoutMs); + Map serverResponses = new HashMap<>(); + + for (Map.Entry streamResponse : serviceResponse._httpResponses.entrySet()) { + try { + List staleSegments = JsonUtils.stringToObject(streamResponse.getValue(), + new TypeReference>() { }); + serverResponses.put(streamResponse.getKey(), new TableStaleSegmentResponse(staleSegments)); + } catch (Exception e) { + serverResponses.put(streamResponse.getKey(), new TableStaleSegmentResponse(e.getMessage())); + LOGGER.error("Unable to parse server {} response for needRefresh for table {} due to an error: ", + streamResponse.getKey(), tableNameWithType, e); + } + } + return serverResponses; + } + private String generateAggregateSegmentMetadataServerURL(String tableNameWithType, List columns, String endpoint) { tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); @@ -470,4 +500,9 @@ private String generateColumnsParam(List columns) { paramsStr = String.join("&", params); return paramsStr; } + + private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) { + tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); + return String.format("%s/tables/%s/segments/isStale", endpoint, tableNameWithType); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java index c9e87b396b53..48f53577a840 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java @@ -33,6 +33,7 @@ import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; +import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; @@ -199,4 +200,17 @@ public JsonNode getAggregateValidDocIdsMetadata(String tableNameWithType, List getStaleSegments(String tableNameWithType, + int timeoutMs) + throws InvalidConfigException, IOException { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + List serverInstances = _pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, tableType); + Set serverInstanceSet = new HashSet<>(serverInstances); + BiMap endpoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverInstanceSet); + ServerSegmentMetadataReader serverSegmentMetadataReader = + new ServerSegmentMetadataReader(_executor, _connectionManager); + return serverSegmentMetadataReader.getStaleSegmentsFromServer(tableNameWithType, serverInstanceSet, endpoints, + timeoutMs); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 10ff609b44b1..de9299bb334d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -30,6 +30,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -40,6 +42,7 @@ import java.util.concurrent.locks.Lock; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -59,25 +62,40 @@ import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.core.util.PeerServerSegmentFinder; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.StaleSegment; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; +import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils; +import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig; import org.apache.pinot.segment.local.utils.SegmentLocks; +import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil; +import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.index.startree.StarTreeV2; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; +import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; @@ -1046,6 +1064,276 @@ public boolean needReloadSegments() return needReload; } + @Override + public List getStaleSegments(TableConfig tableConfig, Schema schema) { + List staleSegments = new ArrayList<>(); + List segmentDataManagers = acquireAllSegments(); + try { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + StaleSegment response = isSegmentStale(tableConfig, schema, segmentDataManager); + if (response.isStale()) { + staleSegments.add(response); + } + } + } finally { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + releaseSegment(segmentDataManager); + } + } + + return staleSegments; + } + + protected StaleSegment isSegmentStale(TableConfig tableConfig, Schema schema, + SegmentDataManager segmentDataManager) { + String tableNameWithType = tableConfig.getTableName(); + Map indexConfigsMap = + FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema); + + String segmentName = segmentDataManager.getSegmentName(); + IndexSegment segment = segmentDataManager.getSegment(); + SegmentMetadata segmentMetadata = segment.getSegmentMetadata(); + Set segmentPhysicalColumns = segment.getPhysicalColumnNames(); + + // Time column changed + String timeColumn = tableConfig.getValidationConfig().getTimeColumnName(); + if (timeColumn != null) { + if (segmentMetadata.getTimeColumn() == null || !segmentMetadata.getTimeColumn().equals(timeColumn)) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: time column", tableNameWithType, segmentName); + return new StaleSegment(segmentName, true, "time column"); + } + } + + List sortedColumns = tableConfig.getIndexingConfig().getSortedColumn(); + String sortedColumn = CollectionUtils.isNotEmpty(sortedColumns) ? sortedColumns.get(0) : null; + + String partitionColumn = null; + ColumnPartitionConfig partitionConfig = null; + SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + // NOTE: Partition can only be enabled on a single column + if (segmentPartitionConfig != null && segmentPartitionConfig.getColumnPartitionMap().size() == 1) { + Map.Entry entry = + segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next(); + partitionColumn = entry.getKey(); + partitionConfig = entry.getValue(); + } + + Set columnsInSegment = segmentMetadata.getAllColumns(); + + // Column is added + if (!columnsInSegment.containsAll(schema.getPhysicalColumnNames())) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column added", tableNameWithType, segmentName); + return new StaleSegment(segmentName, true, "column added"); + } + + // Get Index configuration for the Table Config + Set noDictionaryColumns = + FieldIndexConfigsUtil.columnsWithIndexDisabled(StandardIndexes.dictionary(), indexConfigsMap); + Set bloomFilters = + FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.bloomFilter(), indexConfigsMap); + Set jsonIndex = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.json(), indexConfigsMap); + Set invertedIndex = + FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.inverted(), indexConfigsMap); + Set nullValueVectorIndex = + FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.nullValueVector(), indexConfigsMap); + Set rangeIndex = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.range(), indexConfigsMap); + Set h3Indexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.h3(), indexConfigsMap); + Set fstIndexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.fst(), indexConfigsMap); + Set textIndexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(), indexConfigsMap); + List starTreeIndexConfigsFromTableConfig = + tableConfig.getIndexingConfig().getStarTreeIndexConfigs(); + + // Get the index configuration for StarTree index from segment metadata as JsonNode. + List starTreeIndexMetadata = segment.getStarTrees(); + + // Generate StarTree index builder config from the segment metadata. + List builderConfigFromSegmentMetadata = new ArrayList<>(); + if (starTreeIndexMetadata != null) { + for (StarTreeV2 starTreeV2 : starTreeIndexMetadata) { + builderConfigFromSegmentMetadata.add(StarTreeV2BuilderConfig.fromMetadata(starTreeV2.getMetadata())); + } + } + + // Generate StarTree index builder configs from the table config. + //TODO: RV This maybe using the wrong function. It is not using the table's schema + List builderConfigFromTableConfigs = + StarTreeBuilderUtils.generateBuilderConfigs(starTreeIndexConfigsFromTableConfig, + tableConfig.getIndexingConfig().isEnableDefaultStarTree(), segmentMetadata); + + // TODO: RV Test + // Check if there is a mismatch between the StarTree index builder configs from the table config and the segment + // metadata. + if (!StarTreeBuilderUtils.areStarTreeBuilderConfigListsEqual(builderConfigFromTableConfigs, + builderConfigFromSegmentMetadata)) { + return new StaleSegment(segmentName, true, "startree index"); + } + + for (String columnName : segmentPhysicalColumns) { + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(columnName); + FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName); + DataSource source = segment.getDataSource(columnName); + Preconditions.checkNotNull(columnMetadata); + Preconditions.checkNotNull(source); + + // Column is deleted + if (fieldSpecInSchema == null) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: column deleted", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "column deleted: " + columnName); + } + + // Field type changed + if (columnMetadata.getFieldType().compareTo(fieldSpecInSchema.getFieldType()) != 0) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: field type", tableNameWithType, + columnName, segmentName); + return new StaleSegment(segmentName, true, "field type changed: " + columnName); + } + + // Data type changed + if (!columnMetadata.getDataType().equals(fieldSpecInSchema.getDataType())) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: data type", tableNameWithType, + columnName, segmentName); + return new StaleSegment(segmentName, true, "data type changed: " + columnName); + } + + // SV/MV changed + if (columnMetadata.isSingleValue() != fieldSpecInSchema.isSingleValueField()) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: single / multi value", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "single / multi value changed: " + columnName); + } + + // TODO: detect if an index changes from Dictionary to Variable Length Dictionary or vice versa. + // TODO: RV TEST + boolean colHasDictionary = columnMetadata.hasDictionary(); + // Encoding changed + if (colHasDictionary == noDictionaryColumns.contains(columnName)) { + // Check if dictionary update is needed + // 1. If the segment metadata has dictionary enabled and table has it disabled, its incompatible and refresh is + // needed. + // 2. If segment metadata has dictionary disabled, check if it has to be overridden. If not overridden, + // refresh is needed, since table has it enabled. + boolean incompatible = colHasDictionary || DictionaryIndexType.ignoreDictionaryOverride( + tableConfig.getIndexingConfig().isOptimizeDictionary(), + tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(), + tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(), + tableConfig.getIndexingConfig().getNoDictionaryCardinalityRatioThreshold(), fieldSpecInSchema, + indexConfigsMap.get(columnName), columnMetadata.getCardinality(), columnMetadata.getTotalNumberOfEntries()); + if (incompatible) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: dictionary encoding,", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "dictionary encoding changed: " + columnName); + } else { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, no change as dictionary overrides applied to col: {}", + tableNameWithType, segmentName, columnName); + } + } + + // Sorted column not sorted + if (columnName.equals(sortedColumn) && !columnMetadata.isSorted()) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: sort column", tableNameWithType, + columnName, segmentName); + return new StaleSegment(segmentName, true, "sort column changed: " + columnName); + } + + if (Objects.isNull(source.getBloomFilter()) == bloomFilters.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: bloom filter changed", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "bloom filter changed: " + columnName); + } + + if (Objects.isNull(source.getJsonIndex()) == jsonIndex.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: json index changed", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "json index changed: " + columnName); + } + + if (Objects.isNull(source.getTextIndex()) == textIndexes.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: text index changed", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "text index changed: " + columnName); + } + + if (Objects.isNull(source.getFSTIndex()) == fstIndexes.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: fst index changed", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "fst index changed: " + columnName); + } + + if (Objects.isNull(source.getH3Index()) == h3Indexes.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: h3 index changed", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "hst index changed: " + columnName); + } + + // If a segment is sorted then it will automatically be given an inverted index and that overrides the + // TableConfig setting + if (columnMetadata.isSorted()) { + // If a column is sorted and does not have an inverted index but the table config does have an inverted index. + // But do not remove the inverted index from a sorted column even if the table config has no inverted index. + if (Objects.isNull(source.getInvertedIndex()) && invertedIndex.contains(columnName)) { + LOGGER.debug( + "tableNameWithType: {}, columnName: {}, segmentName: {}, change: inverted index added to sorted column", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "invert index added to sort column: " + columnName); + } + } else { + if ((Objects.isNull(source.getInvertedIndex())) == invertedIndex.contains(columnName)) { + LOGGER.debug( + "tableNameWithType: {}, columnName: {}, segmentName: {}, change: inverted index changed on unsorted " + + "column", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "inverted index changed on unsorted column: " + columnName); + } + } + + // If a column has a NVV Reader and the Table Config says that it should not, then the NVV Reader can be removed. + // BUT if a column does NOT have a NVV Reader it cannot be added after the segment is created. So, for this check + // only check to see if an existing NVV Reader should be removed, but do not check if an NVV Reader needs to be + // added. + if (!Objects.isNull(source.getNullValueVector()) && !nullValueVectorIndex.contains(columnName)) { + LOGGER.debug( + "tableNameWithType: {}, columnName: {}, segmentName: {}, change: null value vector index removed from " + + "column and cannot be added back to this segment.", tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "null value vector index removed from column: " + columnName); + } + + if (Objects.isNull(source.getRangeIndex()) == rangeIndex.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: range index changed", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "range index changed: " + columnName); + } + + // Partition changed or segment not properly partitioned + if (columnName.equals(partitionColumn)) { + PartitionFunction partitionFunction = columnMetadata.getPartitionFunction(); + if (partitionFunction == null) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partition function", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "partition function added: " + columnName); + } + if (!partitionFunction.getName().equalsIgnoreCase(partitionConfig.getFunctionName())) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partition function name", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "partition function name changed: " + columnName); + } + if (partitionFunction.getNumPartitions() != partitionConfig.getNumPartitions()) { + LOGGER.debug("tableNameWithType: {}, columnName: {},, segmentName: {}, change: num partitions", + tableNameWithType, columnName, segmentName); + return new StaleSegment(segmentName, true, "num partitions changed: " + columnName); + } + Set partitions = columnMetadata.getPartitions(); + if (partitions == null || partitions.size() != 1) { + LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partitions", tableNameWithType, + columnName, segmentName); + return new StaleSegment(segmentName, true, "partitions changed: " + columnName); + } + } + } + + return new StaleSegment(segmentName, false, null); + } + private SegmentDirectory initSegmentDirectory(String segmentName, String segmentCrc, IndexLoadingConfig indexLoadingConfig) throws Exception { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java new file mode 100644 index 000000000000..662c35b56cfc --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.segment.local.data.manager.StaleSegment; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +@Test +public class BaseTableDataManagerNeedRefreshTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "BaseTableDataManagerNeedRefreshTest"); + private static final String DEFAULT_TABLE_NAME = "mytable"; + private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME); + private static final File TABLE_DATA_DIR = new File(TEMP_DIR, OFFLINE_TABLE_NAME); + + private static final String DEFAULT_TIME_COLUMN_NAME = "DaysSinceEpoch"; + private static final String MS_SINCE_EPOCH_COLUMN_NAME = "MilliSecondsSinceEpoch"; + private static final String TEXT_INDEX_COLUMN = "textColumn"; + private static final String TEXT_INDEX_COLUMN_MV = "textColumnMV"; + private static final String PARTITIONED_COLUMN_NAME = "partitionedColumn"; + private static final int NUM_PARTITIONS = 20; // For modulo function + private static final String PARTITION_FUNCTION_NAME = "MoDuLo"; + + private static final String JSON_INDEX_COLUMN = "jsonField"; + private static final String FST_TEST_COLUMN = "DestCityName"; + private static final String NULL_VALUE_COLUMN = "NullValueColumn"; + + private static final TableConfig TABLE_CONFIG; + private static final Schema SCHEMA; + private static final ImmutableSegmentDataManager IMMUTABLE_SEGMENT_DATA_MANAGER; + private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER; + + static { + try { + TABLE_CONFIG = getTableConfigBuilder().build(); + SCHEMA = getSchema(); + IMMUTABLE_SEGMENT_DATA_MANAGER = + createImmutableSegmentDataManager(TABLE_CONFIG, SCHEMA, "basicSegment", generateRows()); + BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected static TableConfigBuilder getTableConfigBuilder() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME) + .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME).setNullHandlingEnabled(true) + .setNoDictionaryColumns(List.of(TEXT_INDEX_COLUMN)); + } + + protected static Schema getSchema() + throws IOException { + return new Schema.SchemaBuilder().addDateTime(DEFAULT_TIME_COLUMN_NAME, FieldSpec.DataType.INT, "1:DAYS:EPOCH", + "1:DAYS") + .addDateTime(MS_SINCE_EPOCH_COLUMN_NAME, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .addSingleValueDimension(PARTITIONED_COLUMN_NAME, FieldSpec.DataType.INT) + .addSingleValueDimension(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING) + .addMultiValueDimension(TEXT_INDEX_COLUMN_MV, FieldSpec.DataType.STRING) + .addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON) + .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING) + .addSingleValueDimension(NULL_VALUE_COLUMN, FieldSpec.DataType.STRING).build(); + } + + protected static List generateRows() { + GenericRow row0 = new GenericRow(); + row0.putValue(DEFAULT_TIME_COLUMN_NAME, 20000); + row0.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20000L * 86400 * 1000); + row0.putValue(TEXT_INDEX_COLUMN, "text_index_column_0"); + row0.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_0"); + row0.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}"); + row0.putValue(FST_TEST_COLUMN, "fst_test_column_0"); + row0.putValue(PARTITIONED_COLUMN_NAME, 0); + + GenericRow row1 = new GenericRow(); + row1.putValue(DEFAULT_TIME_COLUMN_NAME, 20001); + row1.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20001L * 86400 * 1000); + row1.putValue(TEXT_INDEX_COLUMN, "text_index_column_0"); + row1.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_1"); + row1.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}"); + row1.putValue(FST_TEST_COLUMN, "fst_test_column_1"); + row1.putValue(PARTITIONED_COLUMN_NAME, 1); + + GenericRow row2 = new GenericRow(); + row2.putValue(DEFAULT_TIME_COLUMN_NAME, 20002); + row2.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20002L * 86400 * 1000); + row2.putValue(TEXT_INDEX_COLUMN, "text_index_column_0"); + row2.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_2"); + row2.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}"); + row2.putValue(FST_TEST_COLUMN, "fst_test_column_2"); + row2.putValue(PARTITIONED_COLUMN_NAME, 2); + + return List.of(row0, row2, row1); + } + + private static File createSegment(TableConfig tableConfig, Schema schema, + String segmentName, List rows) + throws Exception { + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(TABLE_DATA_DIR.getAbsolutePath()); + config.setSegmentName(segmentName); + config.setSegmentVersion(SegmentVersion.v3); + + //Create ONE row + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(rows)); + driver.build(); + return new File(TABLE_DATA_DIR, segmentName); + } + + private static ImmutableSegmentDataManager createImmutableSegmentDataManager(TableConfig tableConfig, Schema schema, + String segmentName, List rows) + throws Exception { + ImmutableSegmentDataManager segmentDataManager = mock(ImmutableSegmentDataManager.class); + when(segmentDataManager.getSegmentName()).thenReturn(segmentName); + File indexDir = createSegment(tableConfig, schema, segmentName, rows); + + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, schema); + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig); + when(segmentDataManager.getSegment()).thenReturn(immutableSegment); + return segmentDataManager; + } + + @Test + void testAddTimeColumn() + throws Exception { + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).setNullHandlingEnabled(true) + .setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build(); + + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING) + .addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON) + .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING).build(); + + GenericRow row = new GenericRow(); + row.putValue(TEXT_INDEX_COLUMN, "text_index_column"); + row.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}"); + row.putValue(FST_TEST_COLUMN, "fst_test_column"); + + ImmutableSegmentDataManager segmentDataManager = + createImmutableSegmentDataManager(tableConfig, schema, "noChanges", List.of(row)); + BaseTableDataManager tableDataManager = BaseTableDataManagerTest.createTableManager(); + + StaleSegment response = + tableDataManager.isSegmentStale(tableConfig, schema, segmentDataManager); + assertFalse(response.isStale()); + + // Test new time column + response = tableDataManager.isSegmentStale(getTableConfigBuilder().build(), getSchema(), segmentDataManager); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "time column"); + } + + @Test + void testChangeTimeColumn() { + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale( + getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(), SCHEMA, + IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "time column"); + } + + @Test + void testRemoveColumn() + throws Exception { + Schema schema = getSchema(); + schema.removeField(TEXT_INDEX_COLUMN); + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "column deleted: textColumn"); + } + + @Test + void testFieldType() + throws Exception { + Schema schema = getSchema(); + schema.removeField(TEXT_INDEX_COLUMN); + schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, true)); + + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "field type changed: textColumn"); + } + + @Test + void testChangeDataType() + throws Exception { + Schema schema = getSchema(); + schema.removeField(TEXT_INDEX_COLUMN); + schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.INT, true)); + + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "data type changed: textColumn"); + } + + @Test + void testChangeToMV() + throws Exception { + Schema schema = getSchema(); + schema.removeField(TEXT_INDEX_COLUMN); + schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, false)); + + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "single / multi value changed: textColumn"); + } + + @Test + void testChangeToSV() + throws Exception { + Schema schema = getSchema(); + schema.removeField(TEXT_INDEX_COLUMN_MV); + schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV, FieldSpec.DataType.STRING, true)); + + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "single / multi value changed: textColumnMV"); + } + + @Test + void testSortColumnMismatch() { + // Check with a column that is not sorted + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale( + getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(), + SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "sort column changed: MilliSecondsSinceEpoch"); + // Check with a column that is sorted + assertFalse( + BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().setSortedColumn(TEXT_INDEX_COLUMN).build(), + SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale()); + } + + @DataProvider(name = "testFilterArgs") + private Object[][] testFilterArgs() { + return new Object[][]{ + { + "withBloomFilter", getTableConfigBuilder().setBloomFilterColumns( + List.of(TEXT_INDEX_COLUMN)).build(), "bloom filter changed: textColumn" + }, { + "withJsonIndex", getTableConfigBuilder().setJsonIndexColumns( + List.of(JSON_INDEX_COLUMN)).build(), "json index changed: jsonField" + }, { + "withTextIndex", getTableConfigBuilder().setFieldConfigList(List.of( + new FieldConfig(TEXT_INDEX_COLUMN, FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.TEXT), + null, null))).build(), "text index changed: textColumn" + }, { + "withFstIndex", getTableConfigBuilder().setFieldConfigList(List.of( + new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.FST), + null, Map.of(FieldConfig.TEXT_FST_TYPE, FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(), + "fst index changed: DestCityName" + }, { + "withRangeFilter", getTableConfigBuilder().setRangeIndexColumns( + List.of(MS_SINCE_EPOCH_COLUMN_NAME)).build(), "range index changed: MilliSecondsSinceEpoch" + } + }; + } + + @Test(dataProvider = "testFilterArgs") + void testFilter(String segmentName, TableConfig tableConfigWithFilter, String expectedReason) + throws Exception { + ImmutableSegmentDataManager segmentWithFilter = + createImmutableSegmentDataManager(tableConfigWithFilter, SCHEMA, segmentName, generateRows()); + + // When TableConfig has a filter but segment does not have, needRefresh is true. + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), expectedReason); + + // When TableConfig does not have a filter but segment has, needRefresh is true + response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, segmentWithFilter); + assertTrue(response.isStale()); + assertEquals(response.getReason(), expectedReason); + + // When TableConfig has a filter AND segment also has a filter, needRefresh is false + assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA, segmentWithFilter).isStale()); + } + + @Test + void testPartition() + throws Exception { + TableConfig partitionedTableConfig = getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig( + Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig(PARTITION_FUNCTION_NAME, NUM_PARTITIONS)))).build(); + ImmutableSegmentDataManager segmentWithPartition = + createImmutableSegmentDataManager(partitionedTableConfig, SCHEMA, "partitionWithModulo", generateRows()); + + // when segment has no partition AND tableConfig has partitions then needRefresh = true + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "partition function added: partitionedColumn"); + + // when segment has partitions AND tableConfig has no partitions, then needRefresh = false + assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, segmentWithPartition).isStale()); + + // when # of partitions is different, then needRefresh = true + TableConfig partitionedTableConfig40 = getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig( + Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build(); + + response = BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig40, SCHEMA, segmentWithPartition); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "num partitions changed: partitionedColumn"); + + // when partition function is different, then needRefresh = true + TableConfig partitionedTableConfigMurmur = getTableConfigBuilder().setSegmentPartitionConfig( + new SegmentPartitionConfig( + Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build(); + + response = BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfigMurmur, SCHEMA, segmentWithPartition); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "partition function name changed: partitionedColumn"); + } + + @Test + void testNullValueVector() + throws Exception { + TableConfig withoutNullHandling = getTableConfigBuilder().setNullHandlingEnabled(false).build(); + ImmutableSegmentDataManager segmentWithoutNullHandling = + createImmutableSegmentDataManager(withoutNullHandling, SCHEMA, "withoutNullHandling", generateRows()); + + // If null handling is removed from table config AND segment has NVV, then NVV can be removed. needRefresh = true + StaleSegment response = + BASE_TABLE_DATA_MANAGER.isSegmentStale(withoutNullHandling, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); + assertTrue(response.isStale()); + assertEquals(response.getReason(), "null value vector index removed from column: NullValueColumn"); + + // if NVV is added to table config AND segment does not have NVV, then it cannot be added. needRefresh = false + assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, segmentWithoutNullHandling).isStale()); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index 69a8d88fd698..1d17315aa76d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -648,7 +648,7 @@ public void decrypt(File origFile, File decFile) { } } - private static OfflineTableDataManager createTableManager() { + static OfflineTableDataManager createTableManager() { return createTableManager(createDefaultInstanceDataManagerConfig()); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java new file mode 100644 index 000000000000..c2fbd83cb943 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class StaleSegmentCheckIntegrationTest extends BaseClusterIntegrationTest { + private static final String JSON_FIELD = "jsonField"; + + private PinotTaskManager _taskManager; + private PinotHelixTaskResourceManager _taskResourceManager; + private TableConfig _tableConfig; + private Schema _schema; + private List _avroFiles; + private static final String H3_INDEX_COLUMN = "h3Column"; + private static final Map H3_INDEX_PROPERTIES = Collections.singletonMap("resolutions", "5"); + private static final String TEXT_INDEX_COLUMN = "textColumn"; + private static final String NULL_INDEX_COLUMN = "nullField"; + + private static final String JSON_INDEX_COLUMN = "jsonField"; + private static final String FST_TEST_COLUMN = "DestCityName"; + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + startMinion(); + // Start Kafka + startKafka(); + + _taskManager = _controllerStarter.getTaskManager(); + _taskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + _avroFiles = unpackAvroData(_tempDir); + + // Create and upload the schema and table config + _schema = createSchema(); + _schema.addField(new DimensionFieldSpec(JSON_FIELD, FieldSpec.DataType.STRING, true)); + _schema.addField(new DimensionFieldSpec(NULL_INDEX_COLUMN, FieldSpec.DataType.STRING, true)); + _schema.addField(new DimensionFieldSpec(H3_INDEX_COLUMN, FieldSpec.DataType.BYTES, true)); + _schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, true)); + + addSchema(_schema); + + _tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setTimeColumnName(getTimeColumnName()) + .setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(true) + .setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build(); + addTableConfig(_tableConfig); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, _tableConfig, _schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(300_000L); + } + + private FieldConfig getH3FieldConfig() { + return new FieldConfig(H3_INDEX_COLUMN, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.H3, null, + H3_INDEX_PROPERTIES); + } + + private FieldConfig getTextFieldConfig() { + return new FieldConfig(TEXT_INDEX_COLUMN, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null); + } + + private FieldConfig getFstFieldConfig() { + Map propertiesMap = new HashMap<>(); + propertiesMap.put(FieldConfig.TEXT_FST_TYPE, FieldConfig.TEXT_NATIVE_FST_LITERAL); + return new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, + propertiesMap); + } + + @Override + protected IngestionConfig getIngestionConfig() { + List transforms = new ArrayList<>(); + transforms.add(new TransformConfig(JSON_INDEX_COLUMN, + "Groovy({'{\"DestState\":\"'+DestState+'\",\"OriginState\":\"'+OriginState+'\"}'}, DestState, OriginState)")); + transforms.add(new TransformConfig(NULL_INDEX_COLUMN, "Groovy({null})")); + // This is the byte encoding of ST_POINT(-122, 37) + transforms.add(new TransformConfig(H3_INDEX_COLUMN, + "Groovy({[0x00,0xc0,0x5e,0x80,0x00,0x00,0x00,0x00,0x00,0x40,0x42,0x80,0x00,0x00,0x00,0x00,0x00] as byte[]})")); + transforms.add(new TransformConfig(TEXT_INDEX_COLUMN, "Groovy({\"Hello this is a text column\"})")); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setTransformConfigs(transforms); + + return ingestionConfig; + } + + @Test + public void testAddRemoveSortedIndex() + throws Exception { + // Add a sorted column to the table + IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); + indexingConfig.setSortedColumn(Collections.singletonList("Carrier")); + updateTableConfig(_tableConfig); + + Map needRefreshResponses = getStaleSegmentsResponse(); + assertEquals(needRefreshResponses.size(), 1); + assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(), 12); + } + + @Test(dependsOnMethods = "testAddRemoveSortedIndex") + public void testAddRemoveRawIndex() + throws Exception { + // Add a raw index column + IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); + indexingConfig.setNoDictionaryColumns(Collections.singletonList("ActualElapsedTime")); + updateTableConfig(_tableConfig); + + Map needRefreshResponses = getStaleSegmentsResponse(); + assertEquals(needRefreshResponses.size(), 1); + assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(), 12); + } + + @Test(dependsOnMethods = "testAddRemoveSortedIndex") + public void testH3IndexChange() + throws Exception { + // Add a H3 index column + _tableConfig.setFieldConfigList(Collections.singletonList(getH3FieldConfig())); + updateTableConfig(_tableConfig); + + Map needRefreshResponses = getStaleSegmentsResponse(); + assertEquals(needRefreshResponses.size(), 1); + assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(), 12); + } + + private Map getStaleSegmentsResponse() + throws IOException { + return JsonUtils.stringToObject(sendGetRequest( + _controllerRequestURLBuilder.forStaleSegments( + TableNameBuilder.OFFLINE.tableNameWithType(getTableName()))), + new TypeReference>() { }); + } + + @AfterClass + public void tearDown() { + try { + stopMinion(); + stopServer(); + stopBroker(); + stopController(); + stopZk(); + } finally { + FileUtils.deleteQuietly(_tempDir); + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java new file mode 100644 index 000000000000..3e67093f1d78 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.data.manager; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + + +/** + * Encapsulates information for a stale segment. It captures segment name, staleness and reason if it is stale. + */ +public class StaleSegment { + private final String _segmentName; + private final boolean _isStale; + private final String _reason; + + @JsonCreator + public StaleSegment(@JsonProperty("segmentName") String segmentName, @JsonProperty("reason") String reason) { + _segmentName = segmentName; + _isStale = true; + _reason = reason; + } + + public StaleSegment(String segmentName, boolean isStale, String reason) { + _segmentName = segmentName; + _isStale = isStale; + _reason = reason; + } + + @JsonProperty + public String getSegmentName() { + return _segmentName; + } + + @JsonIgnore + public boolean isStale() { + return _isStale; + } + + @JsonProperty + public String getReason() { + return _reason; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 092701bdef8e..cf7e62326971 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -323,4 +323,12 @@ default void onConsumingToDropped(String segmentNameStr) { */ default void onConsumingToOnline(String segmentNameStr) { } + + /** + * Return list of segment names that are stale along with reason. + * @param tableConfig Table Config of the table + * @param schema Schema of the table + * @return List of {@link StaleSegment} with segment names and reason why it is stale + */ + List getStaleSegments(TableConfig tableConfig, Schema schema); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index ce85ec3f3123..ee556fce10e6 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -83,6 +83,7 @@ import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.StaleSegment; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.spi.ColumnMetadata; @@ -95,9 +96,11 @@ import org.apache.pinot.server.access.AccessControlFactory; import org.apache.pinot.server.api.AdminApiApplication; import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.JsonUtils; @@ -979,4 +982,26 @@ public String checkSegmentsReload( return ResourceUtils.convertToJsonString( new ServerSegmentsReloadCheckResponse(needReload, tableDataManager.getInstanceId())); } + + @GET + @Path("/tables/{tableName}/segments/isStale") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get the list of segments that are stale or deviated from table config.", + notes = "Get the list of segments that are stale or deviated from table config") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, + message = "Internal Server error", response = ErrorInfo.class) + }) + public List getStaleSegments( + @ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName, + @Context HttpHeaders headers) { + tableName = DatabaseUtils.translateTableName(tableName, headers); + TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName); + try { + Pair tableConfigSchemaPair = tableDataManager.fetchTableConfigAndSchema(); + return tableDataManager.getStaleSegments(tableConfigSchemaPair.getLeft(), tableConfigSchemaPair.getRight()); + } catch (Exception e) { + throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); + } + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 335f251995f6..da83dc219419 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -244,6 +244,10 @@ public String forTableNeedReload(String tableNameWithType, boolean verbose) { return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, query); } + public String forStaleSegments(String tableNameWithType) { + return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, "isStale"); + } + public String forTableRebalanceStatus(String jobId) { return StringUtil.join("/", _baseUrl, "rebalanceStatus", jobId); }