From 4da7f88f7177c75230bc3f4bb9b3a7defed18121 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Tue, 6 Aug 2024 23:01:05 -0700 Subject: [PATCH 01/12] Add multi stream ingestion support --- .../controller/BaseControllerStarter.java | 9 +- .../helix/SegmentStatusChecker.java | 8 +- .../core/PinotTableIdealStateBuilder.java | 11 +- .../MissingConsumingSegmentFinder.java | 18 ++- .../PinotLLCRealtimeSegmentManager.java | 105 ++++++++++------- .../realtime/SegmentCompletionManager.java | 2 +- .../RealtimeSegmentValidationManager.java | 14 ++- .../PinotLLCRealtimeSegmentManagerTest.java | 18 +-- .../DefaultTableDataManagerProvider.java | 2 +- .../realtime/RealtimeSegmentDataManager.java | 16 ++- .../realtime/SegmentCommitterFactory.java | 2 +- .../segment/local/utils/TableConfigUtils.java | 20 ++-- .../PartitionGroupConsumptionStatus.java | 9 ++ .../stream/PartitionGroupMetadataFetcher.java | 64 ++++++----- .../spi/stream/StreamConsumerFactory.java | 2 +- .../pinot/spi/utils/IngestionConfigUtils.java | 106 ++++++++++++++++++ .../spi/utils/IngestionConfigUtilsTest.java | 19 ++-- 17 files changed, 303 insertions(+), 122 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 342413d3559f..5ac57b0bf88d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -577,10 +577,13 @@ protected void configure() { _helixResourceManager.getAllRealtimeTables().forEach(rt -> { TableConfig tableConfig = _helixResourceManager.getTableConfig(rt); if (tableConfig != null) { - Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + List> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); try { - StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), - streamConfigMap); + for (Map streamConfigMap : streamConfigMaps) { + StreamConfig.validateConsumerType( + streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), + streamConfigMap); + } } catch (Exception e) { existingHlcTables.add(rt); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index c9a48022c0be..1a5f542dd798 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -419,10 +420,11 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon numInvalidEndTime); if (tableType == TableType.REALTIME && tableConfig != null) { - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, - streamConfig).findAndEmitMetrics(idealState); + streamConfigs).findAndEmitMetrics(idealState); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 23a115417f8b..4c888052b3e5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -79,23 +79,24 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n * the collection of shards in partition group 1, should remain unchanged in the response, * whereas shards 3,4 can be added to new partition groups if needed. * - * @param streamConfig the streamConfig from the tableConfig + * @param streamConfigs the List of streamConfig from the tableConfig * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current * partition groups. * The size of this list is equal to the number of partition groups, * and is created using the latest segment zk metadata. */ - public static List getPartitionGroupMetadataList(StreamConfig streamConfig, + public static List getPartitionGroupMetadataList(List streamConfigs, List partitionGroupConsumptionStatusList) { PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); + new PartitionGroupMetadataFetcher(streamConfigs, partitionGroupConsumptionStatusList); try { DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); } catch (Exception e) { Exception fetcherException = partitionGroupMetadataFetcher.getException(); - LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), - streamConfig.getTableNameWithType(), fetcherException); + LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", + streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).reduce((a, b) -> a + "," + b), + streamConfigs.get(0).getTableNameWithType(), fetcherException); throw new RuntimeException(fetcherException); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index f4192a5a1a71..5fe2ffe6d6e9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -24,7 +24,9 @@ import java.time.Instant; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.helix.AccessOption; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -65,25 +67,29 @@ public class MissingConsumingSegmentFinder { private ControllerMetrics _controllerMetrics; public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore propertyStore, - ControllerMetrics controllerMetrics, StreamConfig streamConfig) { + ControllerMetrics controllerMetrics, List streamConfigs) { _realtimeTableName = realtimeTableName; _controllerMetrics = controllerMetrics; _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); _streamPartitionMsgOffsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); // create partition group id to largest stream offset map _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>(); - streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); + streamConfigs.stream().map(streamConfig -> { + streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); + return streamConfig; + }); try { - PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) + PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList()) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); } catch (Exception e) { - LOGGER.warn("Problem encountered in fetching stream metadata for topic: {} of table: {}. " + LOGGER.warn("Problem encountered in fetching stream metadata for topics: {} of table: {}. " + "Continue finding missing consuming segment only with ideal state information.", - streamConfig.getTopicName(), streamConfig.getTableNameWithType()); + streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).collect(Collectors.toList()), + streamConfigs.get(0).getTableNameWithType()); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 56c0e8f5f0ae..3ae56451d635 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -232,7 +232,7 @@ FileUploadDownloadClient initFileUploadDownloadClient() { * for latest segment of each partition group. */ public List getPartitionGroupConsumptionStatusList(IdealState idealState, - StreamConfig streamConfig) { + List streamConfigs) { List partitionGroupConsumptionStatusList = new ArrayList<>(); // From all segment names in the ideal state, find unique partition group ids and their latest segment @@ -257,12 +257,12 @@ public List getPartitionGroupConsumptionStatusL // Create a {@link PartitionGroupConsumptionStatus} for each latest segment StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); for (Map.Entry entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); LLCSegmentName llcSegmentName = entry.getValue(); SegmentZKMetadata segmentZKMetadata = - getSegmentZKMetadata(streamConfig.getTableNameWithType(), llcSegmentName.getSegmentName()); + getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), llcSegmentName.getSegmentName()); PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(partitionGroupId, llcSegmentName.getSequenceNumber(), offsetFactory.create(segmentZKMetadata.getStartOffset()), @@ -322,11 +322,12 @@ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName); - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); int numPartitionGroups = newPartitionGroupMetadataList.size(); int numReplicas = getNumReplicas(tableConfig, instancePartitions); @@ -339,7 +340,8 @@ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { Map> instanceStatesMap = idealState.getRecord().getMapFields(); for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { String segmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, + setupNewPartitionGroup(tableConfig, streamConfigs.get(0), partitionGroupMetadata, currentTimeMs, + instancePartitions, numPartitionGroups, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, instancePartitionsMap); @@ -548,20 +550,21 @@ private void commitSegmentMetadataInternal(String realtimeTableName, long startTimeNs2 = System.nanoTime(); String newConsumingSegmentName = null; if (!isTablePaused(idealState)) { - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); Set partitionIds; try { - partitionIds = getPartitionIds(streamConfig); + partitionIds = getPartitionIds(streamConfigs); } catch (Exception e) { LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. " + "Reading all partition group metadata to determine partition ids.", realtimeTableName, e.toString()); // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed. // We don't need to read partition group metadata for other partition groups. List currentPartitionGroupConsumptionStatusList = - getPartitionGroupConsumptionStatusList(idealState, streamConfig); + getPartitionGroupConsumptionStatusList(idealState, streamConfigs); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); partitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId) .collect(Collectors.toSet()); } @@ -570,7 +573,7 @@ private void commitSegmentMetadataInternal(String realtimeTableName, long newSegmentCreationTimeMs = getCurrentTimeMs(); LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, + createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegment, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), numReplicas); newConsumingSegmentName = newLLCSegment.getSegmentName(); @@ -763,7 +766,7 @@ public long getCommitTimeoutMS(String realtimeTableName) { return commitTimeoutMS; } TableConfig tableConfig = getTableConfig(realtimeTableName); - final Map streamConfigs = IngestionConfigUtils.getStreamConfigMap(tableConfig); + final Map streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); if (streamConfigs.containsKey(StreamConfigProperties.SEGMENT_COMMIT_TIMEOUT_SECONDS)) { final String commitTimeoutSecondsStr = streamConfigs.get(StreamConfigProperties.SEGMENT_COMMIT_TIMEOUT_SECONDS); try { @@ -792,15 +795,32 @@ Set getPartitionIds(StreamConfig streamConfig) } } + @VisibleForTesting + Set getPartitionIds(List streamConfigs) + throws Exception { + Set partitionIds = new HashSet<>(); + for (int i = 0; i < streamConfigs.size(); i++) { + final int index = i; + try { + partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream().map( + partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) + .collect(Collectors.toSet())); + } catch (Exception e) { + LOGGER.warn("Failed to fetch partition ids for stream: {}", streamConfigs.get(i).getTopicName(), e); + } + } + return partitionIds; + } + /** * Fetches the latest state of the PartitionGroups for the stream * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, * it will be skipped from the result */ @VisibleForTesting - List getNewPartitionGroupMetadataList(StreamConfig streamConfig, + List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList) { - return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, + return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); } @@ -917,7 +937,7 @@ private Map getLatestSegmentZKMetadataMap(String rea * IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE. * If so, it should create a new CONSUMING segment for the partition. */ - public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, + public void ensureAllPartitionsConsuming(TableConfig tableConfig, List streamConfigs, OffsetCriteria offsetCriteria) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); @@ -931,15 +951,16 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s List currentPartitionGroupConsumptionStatusList = offsetsHaveToChange ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfig); - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + // FIXME: Right now, we assume topics are sharing same offset criteria + OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); // Read the smallest offset when a new partition is detected - streamConfig.setOffsetCriteria( - offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(offsetsHaveToChange + ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); - streamConfig.setOffsetCriteria(originalOffsetCriteria); - return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); + return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, offsetCriteria); } else { LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", @@ -1159,8 +1180,8 @@ private boolean isAllInstancesInState(Map instanceStateMap, Stri * TODO: split this method into multiple smaller methods */ @VisibleForTesting - IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, IdealState idealState, - List partitionGroupMetadataList, OffsetCriteria offsetCriteria) { + IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List streamConfigs, + IdealState idealState, List partitionGroupMetadataList, OffsetCriteria offsetCriteria) { String realtimeTableName = tableConfig.getTableName(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); @@ -1174,7 +1195,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st Map> instanceStatesMap = idealState.getRecord().getMapFields(); StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); // Get the latest segment ZK metadata for each partition Map latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName); @@ -1239,7 +1260,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, + createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, segmentAssignment, instancePartitionsMap); @@ -1273,7 +1294,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st // Smallest offset is fetched from stream once and cached in partitionIdToSmallestOffset. if (partitionIdToSmallestOffset == null) { - partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig); + partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs); } // Do not create new CONSUMING segment when the stream partition has reached end of life. @@ -1287,7 +1308,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning - createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs, partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset); } else { @@ -1296,7 +1317,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset()); - createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs, partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset); } @@ -1343,7 +1364,8 @@ && new LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == partitionI int partitionId = partitionGroupMetadata.getPartitionGroupId(); if (!latestSegmentZKMetadataMap.containsKey(partitionId)) { String newSegmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, + setupNewPartitionGroup(tableConfig, streamConfigs.get(0), partitionGroupMetadata, currentTimeMs, + instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap); @@ -1371,15 +1393,18 @@ private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig str instancePartitionsMap); } - private Map fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) { - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); - streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); - List partitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); - streamConfig.setOffsetCriteria(originalOffsetCriteria); + private Map fetchPartitionGroupIdToSmallestOffset( + List streamConfigs) { Map partitionGroupIdToSmallestOffset = new HashMap<>(); - for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { - partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + for (StreamConfig streamConfig : streamConfigs) { + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List partitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { + partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + } } return partitionGroupIdToSmallestOffset; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 63d302f92996..5cae81292449 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -102,7 +102,7 @@ protected StreamPartitionMsgOffsetFactory getStreamPartitionMsgOffsetFactory(LLC String rawTableName = llcSegmentName.getTableName(); TableConfig tableConfig = _segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)); StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0)); return StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 88f1bc6ee692..db3d3087967e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -104,14 +105,15 @@ protected void processTable(String tableNameWithType, Context context) { LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType); return; } - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); if (context._runSegmentLevelValidation) { - runSegmentLevelValidation(tableConfig, streamConfig); + runSegmentLevelValidation(tableConfig); } if (shouldEnsureConsuming(tableNameWithType)) { - _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, context._offsetCriteria); + _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfigs, context._offsetCriteria); } } @@ -147,7 +149,7 @@ private boolean shouldEnsureConsuming(String tableNameWithType) { return !isQuotaExceeded; } - private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) { + private void runSegmentLevelValidation(TableConfig tableConfig) { String realtimeTableName = tableConfig.getTableName(); List segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName); @@ -197,7 +199,7 @@ static long computeTotalDocumentCount(List segmentsZKMetadata @Override public void cleanUpTask() { LOGGER.info("Unregister all the validation metrics."); - _validationMetrics.unregisterAllMetrics(); + _validationMetrics.unregisterAllMetrics();/PinotLLCRealtimeSegmentManagerTest } public static final class Context { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 6fa6518a3d2d..9c96480f49f9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -273,7 +273,7 @@ public void testCommitSegment() { // committing segment's partitionGroupId no longer in the newPartitionGroupMetadataList List partitionGroupMetadataListWithout0 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList()); + segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList()); partitionGroupMetadataListWithout0.remove(0); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout0; @@ -592,7 +592,7 @@ public void testRepairs() { */ // 1 reached end of shard. List partitionGroupMetadataListWithout1 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList()); + segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList()); partitionGroupMetadataListWithout1.remove(1); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout1; // noop @@ -879,7 +879,7 @@ public void testStopSegmentManager() // Expected } try { - segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, null); + segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfigs, null); fail(); } catch (IllegalStateException e) { // Expected @@ -1227,7 +1227,7 @@ private static class FakePinotLLCRealtimeSegmentManager extends PinotLLCRealtime int _numReplicas; TableConfig _tableConfig; - StreamConfig _streamConfig; + List _streamConfigs; int _numInstances; InstancePartitions _consumingInstancePartitions; Map _segmentZKMetadataMap = new HashMap<>(); @@ -1255,8 +1255,8 @@ void makeTableConfig() { _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas) .setStreamConfigs(streamConfigs).build(); - _streamConfig = - new StreamConfig(_tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + _streamConfigs = IngestionConfigUtils.getStreamConfigMaps(_tableConfig).stream().map( + streamConfig -> new StreamConfig(_tableConfig.getTableName(), streamConfig)).collect(Collectors.toList()); } void makeConsumingInstancePartitions() { @@ -1274,8 +1274,8 @@ public void setUpNewTable() { } public void ensureAllPartitionsConsuming() { - ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, - getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), null); + ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState, + getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList()), null); } @Override @@ -1355,7 +1355,7 @@ Set getPartitionIds(StreamConfig streamConfig) { } @Override - List getNewPartitionGroupMetadataList(StreamConfig streamConfig, + List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList) { if (_partitionGroupMetadataList != null) { return _partitionGroupMetadataList; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java index fff62329439a..36caa5b86aa3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java @@ -73,7 +73,7 @@ public TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable E } break; case REALTIME: - Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); if (Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE)) && StringUtils.isEmpty(_instanceDataManagerConfig.getSegmentStoreUri())) { throw new IllegalStateException(String.format("Table has enabled %s config. But the server has not " diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 684e1ffa531c..bc49b8d13e5d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -283,6 +283,7 @@ public void deleteSegmentFile() { private Thread _consumerThread; private final int _partitionGroupId; + private final int _streamPatitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; private final TransformPipeline _transformPipeline; @@ -1496,12 +1497,16 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); // TODO Validate configs IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); - _streamConfig = new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + _partitionGroupId = llcSegmentName.getPartitionGroupId(); + _streamPatitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId); + _streamConfig = new StreamConfig( + _tableNameWithType, + IngestionConfigUtils.getStreamConfigMaps(_tableConfig) + .get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId))); _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); _streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory(); String streamTopic = _streamConfig.getTopicName(); _segmentNameStr = _segmentZKMetadata.getSegmentName(); - _partitionGroupId = llcSegmentName.getPartitionGroupId(); _partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(_partitionGroupId, llcSegmentName.getSequenceNumber(), _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()), @@ -1514,9 +1519,9 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf String clientIdSuffix = instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null; if (StringUtils.isNotBlank(clientIdSuffix)) { - _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId + "-" + clientIdSuffix; + _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId + "-" + clientIdSuffix; } else { - _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId; + _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId; } _segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + streamTopic; @@ -1832,7 +1837,8 @@ private void recreateStreamConsumer(String reason) { private void createPartitionMetadataProvider(String reason) { closePartitionMetadataProvider(); _segmentLogger.info("Creating new partition metadata provider, reason: {}", reason); - _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId); + _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider( + _clientId, _streamPatitionGroupId); } private void updateIngestionMetrics(RowMetadata metadata) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 33a3b55654b2..4224019ab0e1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -47,7 +47,7 @@ public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProt _protocolHandler = protocolHandler; _tableConfig = tableConfig; _streamConfig = new StreamConfig(_tableConfig.getTableName(), - IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + IngestionConfigUtils.getStreamConfigMaps(_tableConfig).get(0)); _indexLoadingConfig = indexLoadingConfig; _serverMetrics = serverMetrics; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 387f69a44269..5f5129f1eaf7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -169,15 +169,18 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N // Only allow realtime tables with non-null stream.type and LLC consumer.type if (tableConfig.getTableType() == TableType.REALTIME) { - Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + List> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); + // TODO: validate stream configs in the map are identical in most fields StreamConfig streamConfig; - try { - // Validate that StreamConfig can be created - streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigMap); - } catch (Exception e) { - throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); + for (Map streamConfigMap : streamConfigMaps) { + try { + // Validate that StreamConfig can be created + streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigMap); + } catch (Exception e) { + throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); + } + validateStreamConfig(streamConfig); } - validateStreamConfig(streamConfig); } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfig(tableConfig.getIndexingConfig(), schema); @@ -390,7 +393,8 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc Preconditions.checkState(indexingConfig == null || MapUtils.isEmpty(indexingConfig.getStreamConfigs()), "Should not use indexingConfig#getStreamConfigs if ingestionConfig#StreamIngestionConfig is provided"); List> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); - Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream is supported in REALTIME table"); + Preconditions.checkState(streamConfigMaps.size() > 0, "Must have at least 1 stream in REALTIME table"); + // TODO: for multiple stream configs, validate them } // Filter config diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java index d519a2302917..bc02df8462dd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.spi.stream; +import org.apache.pinot.spi.utils.IngestionConfigUtils; + + /** * A PartitionGroup is a group of partitions/shards that the same consumer should consume from. * This class contains all information which describes the latest state of a partition group. @@ -36,6 +39,7 @@ public class PartitionGroupConsumptionStatus { private final int _partitionGroupId; + private final int _streamPartitionGroupId; private int _sequenceNumber; private StreamPartitionMsgOffset _startOffset; private StreamPartitionMsgOffset _endOffset; @@ -44,6 +48,7 @@ public class PartitionGroupConsumptionStatus { public PartitionGroupConsumptionStatus(int partitionGroupId, int sequenceNumber, StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, String status) { _partitionGroupId = partitionGroupId; + _streamPartitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); _sequenceNumber = sequenceNumber; _startOffset = startOffset; _endOffset = endOffset; @@ -54,6 +59,10 @@ public int getPartitionGroupId() { return _partitionGroupId; } + public int getStreamPartitionGroupId() { + return _streamPartitionGroupId; + } + public int getSequenceNumber() { return _sequenceNumber; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 98094b9e88cb..d0079c1f5f44 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -18,14 +18,18 @@ */ package org.apache.pinot.spi.stream; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the stream, + * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the streams, * using the {@link StreamMetadataProvider} */ public class PartitionGroupMetadataFetcher implements Callable { @@ -33,18 +37,17 @@ public class PartitionGroupMetadataFetcher implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); private List _newPartitionGroupMetadataList; - private final StreamConfig _streamConfig; + private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; - private final StreamConsumerFactory _streamConsumerFactory; private Exception _exception; - private final String _topicName; + private final List _topicNames; public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List partitionGroupConsumptionStatusList) { - _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - _topicName = streamConfig.getTopicName(); - _streamConfig = streamConfig; + _topicNames = Arrays.asList(streamConfig.getTopicName()); + _streamConfigs = Arrays.asList(streamConfig); _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; + _newPartitionGroupMetadataList = new ArrayList<>(); } public List getPartitionGroupMetadataList() { @@ -63,25 +66,36 @@ public Exception getException() { @Override public Boolean call() throws Exception { - String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" - + _streamConfig.getTableNameWithType() + "-" + _topicName; - try ( - StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) { - _newPartitionGroupMetadataList = streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfig, - _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000); - if (_exception != null) { - // We had at least one failure, but succeeded now. Log an info - LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", _topicName); + for (int i = 0; i < _streamConfigs.size(); i++) { + String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + + _streamConfigs.get(i).getTableNameWithType() + "-" + _topicNames.get(i); + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfigs.get(i)); + try ( + StreamMetadataProvider streamMetadataProvider = + streamConsumerFactory.createStreamMetadataProvider(clientId)) { + final int index = i; + _newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId, + _streamConfigs.get(i), + _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( + metadata -> new PartitionGroupMetadata( + IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( + metadata.getPartitionGroupId(), index), + metadata.getStartOffset())).collect(Collectors.toList()) + ); + if (_exception != null) { + // We had at least one failure, but succeeded now. Log an info + LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", _topicNames.get(i)); + } + } catch (TransientConsumerException e) { + LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicNames.get(i), e); + _exception = e; + return Boolean.FALSE; + } catch (Exception e) { + LOGGER.warn("Could not get partition count for topic {}", _topicNames.get(i), e); + _exception = e; + throw e; } - return Boolean.TRUE; - } catch (TransientConsumerException e) { - LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicName, e); - _exception = e; - return Boolean.FALSE; - } catch (Exception e) { - LOGGER.warn("Could not get partition count for topic {}", _topicName, e); - _exception = e; - throw e; } + return Boolean.TRUE; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index 812b7b8e0f92..a8c4d22cc32a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -59,7 +59,7 @@ public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() { */ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { - return createPartitionLevelConsumer(clientId, partitionGroupConsumptionStatus.getPartitionGroupId()); + return createPartitionLevelConsumer(clientId, partitionGroupConsumptionStatus.getStreamPartitionGroupId()); } @Deprecated diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 2aeba4160bf4..edea275f35d5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.utils; import com.google.common.base.Preconditions; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,6 +47,14 @@ private IngestionConfigUtils() { private static final int DEFAULT_PUSH_ATTEMPTS = 5; private static final int DEFAULT_PUSH_PARALLELISM = 1; private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L; + // For partition from different tables, we pad then with an offset to avoid collision. The offset is far higher + // than the normal max number of partitions on stream (e.g. 512). + public static final int PARTITION_PADDING_OFFSET = 10000; + public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING = + "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory"; + public static final String STREAM_TYPE = "streamType"; + public static final String STREAM_CONSUMER_FACTORY_CLASS = "stream.consumer.factory.class"; + /** * Fetches the streamConfig from the given realtime table. @@ -75,6 +84,103 @@ public static Map getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List> getStreamConfigMaps(TableConfig tableConfig) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream"); + // For now, with multiple topics, we only support same type of stream (e.g. Kafka) + String consumerFactoryClassName = null; + for (Map map : streamConfigMaps) { + String type = map.get(STREAM_TYPE); + String consumerFactoryClassKey = + StringUtils.joinWith(".", "stream", type, STREAM_CONSUMER_FACTORY_CLASS); + // For backward compatibility, default consumer factory is for Kafka. + String currentConsumerFactoryClassName = + map.getOrDefault(consumerFactoryClassKey, DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING); + Preconditions.checkState( + consumerFactoryClassName == null || consumerFactoryClassName.equals(currentConsumerFactoryClassName), + "Multiple stream types not supported"); + consumerFactoryClassName = currentConsumerFactoryClassName; + } + return streamConfigMaps; + } + if (tableConfig.getIndexingConfig() != null && tableConfig.getIndexingConfig().getStreamConfigs() != null) { + return Arrays.asList(tableConfig.getIndexingConfig().getStreamConfigs()); + } + throw new IllegalStateException("Could not find streamConfigs for REALTIME table: " + tableNameWithType); + } + + /** + * Getting the Pinot segment level partition id from the stream partition id. + * @param partitionId the partition group id from the stream + * @param index the index of the SteamConfig from the list of StreamConfigs + * @return + */ + public static int getPinotPartitionIdFromStreamPartitionId(int partitionId, int index) { + return index * PARTITION_PADDING_OFFSET + partitionId; + } + + /** + * Getting the Stream partition id from the Pinot segment partition id. + * @param partitionId the segment partition group id on Pinot + * @return + */ + public static int getStreamPartitionIdFromPinotPartitionId(int partitionId) { + return partitionId % PARTITION_PADDING_OFFSET; + } + + /** + * Getting the StreamConfig index of StreamConfigs list from the Pinot segment partition id. + * @param partitionId the segment partition group id on Pinot + * @return + */ + public static int getStreamConfigIndexFromPinotPartitionId(int partitionId) { + return partitionId / PARTITION_PADDING_OFFSET; + } + + /** + * Fetches the streamConfig from the list of streamConfigs according to the partitonGroupId. + * @param tableConfig realtime table config + * @param partitionGroupId partitionGroupId + * @return streamConfig map + */ + public static Map getStreamConfigMapWithPartitionGroupId( + TableConfig tableConfig, int partitionGroupId) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); + Map streamConfigMap = null; + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + Preconditions.checkState( + streamConfigMaps.size() > partitionGroupId / PARTITION_PADDING_OFFSET, + "Table does not have enough number of stream"); + streamConfigMap = streamConfigMaps.get(partitionGroupId / PARTITION_PADDING_OFFSET); + } + if (partitionGroupId < PARTITION_PADDING_OFFSET + && streamConfigMap == null && tableConfig.getIndexingConfig() != null) { + streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs(); + } + if (streamConfigMap == null) { + throw new IllegalStateException("Could not find streamConfigs for REALTIME table: " + tableNameWithType); + } + return streamConfigMap; + } + public static List getAggregationConfigs(TableConfig tableConfig) { String tableNameWithType = tableConfig.getTableName(); Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index b2b4c87b29e5..1e9517a33011 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; @@ -44,7 +45,9 @@ public class IngestionConfigUtilsTest { public void testGetStreamConfigMap() { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); + Assert.fail("Should fail for OFFLINE table"); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); Assert.fail("Should fail for OFFLINE table"); } catch (IllegalStateException e) { // expected @@ -58,7 +61,7 @@ public void testGetStreamConfigMap() { IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap))); tableConfig.setIngestionConfig(ingestionConfig); - Map actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 1); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka"); @@ -69,30 +72,30 @@ public void testGetStreamConfigMap() { IndexingConfig indexingConfig = new IndexingConfig(); indexingConfig.setStreamConfigs(deprecatedStreamConfigMap); tableConfig.setIndexingConfig(indexingConfig); - actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 1); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka"); - // fail if multiple found + // Able to get multiple stream configs ingestionConfig.setStreamIngestionConfig( new StreamIngestionConfig(Arrays.asList(streamConfigMap, deprecatedStreamConfigMap))); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); - Assert.fail("Should fail for multiple stream configs"); + List> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig); + Assert.assertEquals(streamConfigs.size(), 2); } catch (IllegalStateException e) { // expected } // get from indexing config tableConfig.setIngestionConfig(null); - actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 2); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "foo"); // fail if found nowhere tableConfig.setIndexingConfig(new IndexingConfig()); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); Assert.fail("Should fail for no stream config found"); } catch (IllegalStateException e) { // expected From e5955cef867ed6ef8b10ad2b80c89d285a7cf552 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Fri, 9 Aug 2024 16:44:21 -0700 Subject: [PATCH 02/12] Fix UT --- .../apache/pinot/segment/local/utils/TableConfigUtilsTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 98b0ba552c18..ca3e1e863ada 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -687,9 +687,8 @@ public void ingestionStreamConfigsTest() { // only 1 stream config allowed try { TableConfigUtils.validateIngestionConfig(tableConfig, null); - Assert.fail("Should fail for more than 1 stream config"); } catch (IllegalStateException e) { - // expected + Assert.fail("Multiple stream configs should be supported"); } // stream config should be valid From d1e3acc19c4a79cbe9c82b7b78df7d69b56c576d Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Wed, 16 Oct 2024 23:03:45 -0700 Subject: [PATCH 03/12] Fix issues, rebase and resolve comments --- .../RealtimeSegmentValidationManager.java | 2 +- .../pinot/segment/local/utils/TableConfigUtils.java | 4 ++++ .../segment/local/utils/TableConfigUtilsTest.java | 2 +- .../spi/stream/PartitionGroupMetadataFetcher.java | 13 ++++++++++++- .../pinot/spi/stream/StreamMetadataProvider.java | 2 +- 5 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index db3d3087967e..dbe229ebc9da 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -199,7 +199,7 @@ static long computeTotalDocumentCount(List segmentsZKMetadata @Override public void cleanUpTask() { LOGGER.info("Unregister all the validation metrics."); - _validationMetrics.unregisterAllMetrics();/PinotLLCRealtimeSegmentManagerTest + _validationMetrics.unregisterAllMetrics(); } public static final class Context { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 5f5129f1eaf7..141e0c280a93 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -170,6 +170,10 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N // Only allow realtime tables with non-null stream.type and LLC consumer.type if (tableConfig.getTableType() == TableType.REALTIME) { List> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); + if (streamConfigMaps.size() > 1) { + Preconditions.checkArgument(!tableConfig.isUpsertEnabled(), + "Multiple stream configs are not supported for upsert tables"); + } // TODO: validate stream configs in the map are identical in most fields StreamConfig streamConfig; for (Map streamConfigMap : streamConfigMaps) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index ca3e1e863ada..e845cf018781 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -684,7 +684,7 @@ public void ingestionStreamConfigsTest() { new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") .setIngestionConfig(ingestionConfig).build(); - // only 1 stream config allowed + // Multiple stream configs is allowed try { TableConfigUtils.validateIngestionConfig(tableConfig, null); } catch (IllegalStateException e) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index d0079c1f5f44..e5f51d69bf7d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -66,17 +66,28 @@ public Exception getException() { @Override public Boolean call() throws Exception { + _newPartitionGroupMetadataList.clear(); for (int i = 0; i < _streamConfigs.size(); i++) { String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _streamConfigs.get(i).getTableNameWithType() + "-" + _topicNames.get(i); StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfigs.get(i)); + final int index = i; + List topicPartitionGroupConsumptionStatusList = + _partitionGroupConsumptionStatusList.stream() + .filter(partitionGroupConsumptionStatus -> + IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId( + partitionGroupConsumptionStatus.getPartitionGroupId()) == index) + .collect(Collectors.toList()); try ( StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId)) { - final int index = i; _newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfigs.get(i), +<<<<<<< HEAD _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( +======= + topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/5000).stream().map( +>>>>>>> ca24d4bf7b (Fix issues, rebase and resolve comments) metadata -> new PartitionGroupMetadata( IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( metadata.getPartitionGroupId(), index), diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 85bb2801a1f6..052993a6d0fb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -81,7 +81,7 @@ default List computePartitionGroupMetadata(String client // If partition group is still in progress, this value will be null for (PartitionGroupConsumptionStatus currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) { newPartitionGroupMetadataList.add( - new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getPartitionGroupId(), + new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getStreamPartitionGroupId(), currentPartitionGroupConsumptionStatus.getEndOffset())); } // Add PartitionGroupMetadata for new partitions From cf5321749cc1f5561e1060d3b797722fd8ec7efb Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Thu, 21 Nov 2024 16:34:13 -0800 Subject: [PATCH 04/12] Resolve comments --- .../controller/BaseControllerStarter.java | 7 +- .../core/PinotTableIdealStateBuilder.java | 5 ++ .../stream/PartitionGroupMetadataFetcher.java | 7 ++ .../apache/pinot/spi/stream/StreamConfig.java | 10 +-- .../pinot/spi/utils/IngestionConfigUtils.java | 70 +++++++------------ 5 files changed, 47 insertions(+), 52 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 5ac57b0bf88d..0326f97a7bca 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -257,7 +257,7 @@ public void init(PinotConfiguration pinotConfiguration) // This executor service is used to do async tasks from multiget util or table rebalancing. _executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d"); _tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(), - "tenant-rebalance-thread-%d"); + "tenant-rebalance-thread-%d"); _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); } @@ -272,7 +272,7 @@ public void init(PinotConfiguration pinotConfiguration) private ExecutorService createExecutorService(int numThreadPool, String threadNameFormat) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(); return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory) - : Executors.newFixedThreadPool(numThreadPool, threadFactory); + : Executors.newFixedThreadPool(numThreadPool, threadFactory); } private void inferHostnameIfNeeded(ControllerConf config) { @@ -580,8 +580,7 @@ protected void configure() { List> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); try { for (Map streamConfigMap : streamConfigMaps) { - StreamConfig.validateConsumerType( - streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), + StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), streamConfigMap); } } catch (Exception e) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 4c888052b3e5..94b98e1f14b7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -54,6 +54,7 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n /** * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. + * In particular, this method can also be used to fetch from multiple stream topics. * * Reasons why partitionGroupConsumptionStatusList is needed: * @@ -79,7 +80,11 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n * the collection of shards in partition group 1, should remain unchanged in the response, * whereas shards 3,4 can be added to new partition groups if needed. * +<<<<<<< HEAD * @param streamConfigs the List of streamConfig from the tableConfig +======= + * @param streamConfigs the streamConfigs from the tableConfig +>>>>>>> cae4dc5126 (Resolve comments) * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current * partition groups. * The size of this list is equal to the number of partition groups, diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index e5f51d69bf7d..88c10a0256e9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -42,10 +42,17 @@ public class PartitionGroupMetadataFetcher implements Callable { private Exception _exception; private final List _topicNames; +<<<<<<< HEAD public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List partitionGroupConsumptionStatusList) { _topicNames = Arrays.asList(streamConfig.getTopicName()); _streamConfigs = Arrays.asList(streamConfig); +======= + public PartitionGroupMetadataFetcher(List streamConfigs, + List partitionGroupConsumptionStatusList) { + _topicNames = streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList()); + _streamConfigs = streamConfigs; +>>>>>>> cae4dc5126 (Resolve comments) _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; _newPartitionGroupMetadataList = new ArrayList<>(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index 39d061473e35..e52610dd6771 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -223,7 +223,7 @@ public Boolean isServerUploadToDeepStore() { return _serverUploadToDeepStore; } - private double extractFlushThresholdVarianceFraction(Map streamConfigMap) { + public static double extractFlushThresholdVarianceFraction(Map streamConfigMap) { String key = StreamConfigProperties.FLUSH_THRESHOLD_VARIANCE_FRACTION; String flushThresholdVarianceFractionStr = streamConfigMap.get(key); if (flushThresholdVarianceFractionStr != null) { @@ -245,7 +245,7 @@ private double extractFlushThresholdVarianceFraction(Map streamC } } - private long extractFlushThresholdSegmentSize(Map streamConfigMap) { + public static long extractFlushThresholdSegmentSize(Map streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE; String flushThresholdSegmentSizeStr = streamConfigMap.get(key); if (flushThresholdSegmentSizeStr == null) { @@ -264,7 +264,7 @@ private long extractFlushThresholdSegmentSize(Map streamConfigMa } } - protected int extractFlushThresholdRows(Map streamConfigMap) { + public static int extractFlushThresholdRows(Map streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS; String flushThresholdRowsStr = streamConfigMap.get(key); if (flushThresholdRowsStr == null) { @@ -288,7 +288,7 @@ protected int extractFlushThresholdRows(Map streamConfigMap) { } } - protected int extractFlushThresholdSegmentRows(Map streamConfigMap) { + public static int extractFlushThresholdSegmentRows(Map streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_ROWS; String flushThresholdSegmentRowsStr = streamConfigMap.get(key); if (flushThresholdSegmentRowsStr != null) { @@ -302,7 +302,7 @@ protected int extractFlushThresholdSegmentRows(Map streamConfigM } } - protected long extractFlushThresholdTimeMillis(Map streamConfigMap) { + public static long extractFlushThresholdTimeMillis(Map streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME; String flushThresholdTimeStr = streamConfigMap.get(key); if (flushThresholdTimeStr == null) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index edea275f35d5..6df8a0ad3eb2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -30,6 +30,7 @@ import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.stream.StreamConfig; /** @@ -55,35 +56,6 @@ private IngestionConfigUtils() { public static final String STREAM_TYPE = "streamType"; public static final String STREAM_CONSUMER_FACTORY_CLASS = "stream.consumer.factory.class"; - - /** - * Fetches the streamConfig from the given realtime table. - * First, the ingestionConfigs->stream->streamConfigs will be checked. - * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). - * @param tableConfig realtime table config - * @return streamConfigs map - */ - public static Map getStreamConfigMap(TableConfig tableConfig) { - String tableNameWithType = tableConfig.getTableName(); - Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, - "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); - Map streamConfigMap = null; - if (tableConfig.getIngestionConfig() != null - && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { - List> streamConfigMaps = - tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); - Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table"); - streamConfigMap = streamConfigMaps.get(0); - } - if (streamConfigMap == null && tableConfig.getIndexingConfig() != null) { - streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs(); - } - if (streamConfigMap == null) { - throw new IllegalStateException("Could not find streamConfigs for REALTIME table: " + tableNameWithType); - } - return streamConfigMap; - } - /** * Fetches the streamConfig from the given realtime table. * First, the ingestionConfigs->stream->streamConfigs will be checked. @@ -99,20 +71,32 @@ public static List> getStreamConfigMaps(TableConfig tableCon && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { List> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); - Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream"); - // For now, with multiple topics, we only support same type of stream (e.g. Kafka) - String consumerFactoryClassName = null; - for (Map map : streamConfigMaps) { - String type = map.get(STREAM_TYPE); - String consumerFactoryClassKey = - StringUtils.joinWith(".", "stream", type, STREAM_CONSUMER_FACTORY_CLASS); - // For backward compatibility, default consumer factory is for Kafka. - String currentConsumerFactoryClassName = - map.getOrDefault(consumerFactoryClassKey, DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING); - Preconditions.checkState( - consumerFactoryClassName == null || consumerFactoryClassName.equals(currentConsumerFactoryClassName), - "Multiple stream types not supported"); - consumerFactoryClassName = currentConsumerFactoryClassName; + Preconditions.checkState(!streamConfigMaps.isEmpty(), "Table must have at least 1 stream"); + /* + Apply the following checks if there are multiple streamConfigs + 1. Check if all streamConfigs have the same stream type. TODO: remove this limitation once we've tested it + 2. Ensure segment flush parameters consistent across all streamConfigs. We need this because Pinot is predefining + the values before fetching stream partition info from stream. At the construction time, we don't know the value + extracted from a streamConfig would be applied to which segment. + TODO: remove this limitation once we've refactored the code and supported it. + */ + Map firstStreamConfigMap = streamConfigMaps.get(0); + for (int i = 1; i < streamConfigMaps.size(); i++) { + Map map = streamConfigMaps.get(i); + Preconditions.checkNotNull(map.get(STREAM_TYPE), + "streamType must be defined for all streamConfigs for REALTIME table: %s", tableNameWithType); + Preconditions.checkState(StringUtils.equals(map.get(STREAM_TYPE), firstStreamConfigMap.get(STREAM_TYPE)) + && StreamConfig.extractFlushThresholdRows(map) == StreamConfig.extractFlushThresholdRows( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdTimeMillis(map) == StreamConfig.extractFlushThresholdTimeMillis( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdVarianceFraction(map) + == StreamConfig.extractFlushThresholdVarianceFraction(firstStreamConfigMap) + && StreamConfig.extractFlushThresholdSegmentSize(map) == StreamConfig.extractFlushThresholdSegmentSize( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdSegmentRows(map) == StreamConfig.extractFlushThresholdSegmentRows( + firstStreamConfigMap), + "All streamConfigs must have the same stream type for REALTIME table: %s", tableNameWithType); } return streamConfigMaps; } From d3ca4a9299d4752811fdb1db7faed8e2dcca33af Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Thu, 21 Nov 2024 16:42:46 -0800 Subject: [PATCH 05/12] Fix style --- .../pinot/spi/stream/PartitionGroupMetadataFetcher.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 88c10a0256e9..e4bd6df12b72 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -19,7 +19,6 @@ package org.apache.pinot.spi.stream; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.stream.Collectors; @@ -36,7 +35,7 @@ public class PartitionGroupMetadataFetcher implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); - private List _newPartitionGroupMetadataList; + private final List _newPartitionGroupMetadataList; private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; private Exception _exception; @@ -94,10 +93,16 @@ public Boolean call() _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( ======= topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/5000).stream().map( +<<<<<<< HEAD >>>>>>> ca24d4bf7b (Fix issues, rebase and resolve comments) metadata -> new PartitionGroupMetadata( IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( metadata.getPartitionGroupId(), index), +======= + metadata -> new PartitionGroupMetadata( + IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( + metadata.getPartitionGroupId(), index), +>>>>>>> 1c346671d0 (Fix style) metadata.getStartOffset())).collect(Collectors.toList()) ); if (_exception != null) { From 890feb0589fa10d6f10d8c65624dd3ff6725f8b8 Mon Sep 17 00:00:00 2001 From: Christopher Peck Date: Fri, 22 Nov 2024 17:15:13 -0800 Subject: [PATCH 06/12] Ensure transient exceptions do not prevent creating new consuming segments Summary: Ensure transient exceptions do not prevent creating new consuming segments. If some exception is hit, attempt to reconcile any successful fetches with partition group metadata. This ensures consuming partitions are not dropped, and attempts to add and new partitions discovered successfully. Test Plan: After deployment, despite still some `TransientConsumerException`, no new missing consuming segments appear {F1002071843} {F1002071523} Reviewers: gaoxin, tingchen Reviewed By: gaoxin JIRA Issues: EVA-8951 Differential Revision: https://code.uberinternal.com/D15748639 --- .../PinotLLCRealtimeSegmentManager.java | 40 ++++++++++--------- .../PinotLLCRealtimeSegmentManagerTest.java | 34 +++++++++++++++- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 3ae56451d635..92a5dde0fe45 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -553,21 +553,7 @@ private void commitSegmentMetadataInternal(String realtimeTableName, List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) ).collect(Collectors.toList()); - Set partitionIds; - try { - partitionIds = getPartitionIds(streamConfigs); - } catch (Exception e) { - LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. " - + "Reading all partition group metadata to determine partition ids.", realtimeTableName, e.toString()); - // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed. - // We don't need to read partition group metadata for other partition groups. - List currentPartitionGroupConsumptionStatusList = - getPartitionGroupConsumptionStatusList(idealState, streamConfigs); - List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); - partitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId) - .collect(Collectors.toSet()); - } + Set partitionIds = getPartitionIds(streamConfigs, idealState); if (partitionIds.contains(committingSegmentPartitionGroupId)) { String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); @@ -796,19 +782,35 @@ Set getPartitionIds(StreamConfig streamConfig) } @VisibleForTesting - Set getPartitionIds(List streamConfigs) - throws Exception { + Set getPartitionIds(List streamConfigs, IdealState idealState) { Set partitionIds = new HashSet<>(); + boolean allPartitionIdsFetched = true; for (int i = 0; i < streamConfigs.size(); i++) { final int index = i; try { - partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream().map( - partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) + partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream() + .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) .collect(Collectors.toSet())); } catch (Exception e) { + allPartitionIdsFetched = false; LOGGER.warn("Failed to fetch partition ids for stream: {}", streamConfigs.get(i).getTopicName(), e); } } + + // Some fetches failed, so ensure we do not miss any partition ids + if (!allPartitionIdsFetched) { + LOGGER.info( + "Fetch partition ids from Stream incomplete, merge fetched partitionIds with partition group metadata " + + "for: {}", idealState.getId()); + // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed. + // We don't need to read partition group metadata for other partition groups. + List currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + List newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + partitionIds.addAll(newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId) + .collect(Collectors.toSet())); + } return partitionIds; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 9c96480f49f9..974e01bf95c1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -91,8 +91,8 @@ import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS; import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; import static org.testng.Assert.*; @@ -1214,6 +1214,36 @@ public void testDeleteTmpSegmentFiles() assertEquals(numDeletedTmpSegments, 1); } + @Test + public void testGetPartitionIds() + throws Exception { + List streamConfigs = List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs()); + IdealState idealState = new IdealState("table"); + FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(); + segmentManager._numPartitions = 2; + + // Test empty ideal state + Set partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState); + Assert.assertEquals(partitionIds.size(), 2); + partitionIds.clear(); + + // Simulate the case where getPartitionIds(StreamConfig) throws an exception (e.g. transient kafka connection issue) + PinotLLCRealtimeSegmentManager segmentManagerSpy = spy(FakePinotLLCRealtimeSegmentManager.class); + doThrow(new RuntimeException()).when(segmentManagerSpy).getPartitionIds(any(StreamConfig.class)); + List partitionGroupConsumptionStatusList = + List.of(new PartitionGroupConsumptionStatus(0, 12, new LongMsgOffset(123), new LongMsgOffset(234), "ONLINE"), + new PartitionGroupConsumptionStatus(1, 12, new LongMsgOffset(123), new LongMsgOffset(345), "ONLINE")); + doReturn(partitionGroupConsumptionStatusList).when(segmentManagerSpy) + .getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + List partitionGroupMetadataList = + List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)), + new PartitionGroupMetadata(1, new LongMsgOffset(345))); + doReturn(partitionGroupMetadataList).when(segmentManagerSpy) + .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList); + partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState); + Assert.assertEquals(partitionIds.size(), 2); + } + ////////////////////////////////////////////////////////////////////////////////// // Fake classes ///////////////////////////////////////////////////////////////////////////////// From b4b6e80fed9b31025ee0658866bddf03192a41c1 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Mon, 9 Dec 2024 11:45:48 -0800 Subject: [PATCH 07/12] Resolve comments for optimizing java doc --- .../helix/core/realtime/PinotLLCRealtimeSegmentManager.java | 3 ++- .../data/manager/realtime/RealtimeSegmentDataManager.java | 6 ++++++ .../pinot/segment/local/utils/TableConfigUtilsTest.java | 2 +- .../pinot/tools/predownload/PredownloadException.java | 2 ++ .../apache/pinot/tools/predownload/PredownloadMetrics.java | 2 ++ .../pinot/tools/predownload/PredownloadScheduler.java | 2 ++ .../org/apache/pinot/tools/predownload/SegmentInfo.java | 2 ++ .../org/apache/pinot/tools/predownload/StatusRecorder.java | 2 ++ .../java/org/apache/pinot/tools/predownload/TableInfo.java | 2 ++ .../java/org/apache/pinot/tools/predownload/ZKClient.java | 2 ++ .../apache/pinot/tools/predownload/StatusRecorderTest.java | 2 ++ .../java/org/apache/pinot/tools/predownload/TestUtil.java | 2 ++ 12 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadException.java create mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadMetrics.java create mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadScheduler.java create mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/SegmentInfo.java create mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/StatusRecorder.java create mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/TableInfo.java create mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/ZKClient.java create mode 100644 pinot-tools/src/test/java/org/apache/pinot/tools/predownload/StatusRecorderTest.java create mode 100644 pinot-tools/src/test/java/org/apache/pinot/tools/predownload/TestUtil.java diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 92a5dde0fe45..cdc281603c2f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -797,7 +797,8 @@ Set getPartitionIds(List streamConfigs, IdealState idealS } } - // Some fetches failed, so ensure we do not miss any partition ids + // If it is failing to fetch partition ids from stream (usually transient due to stream metadata service outage), + // we need to use the existing partition information from ideal state to keep same ingestion behavior. if (!allPartitionIdsFetched) { LOGGER.info( "Fetch partition ids from Stream incomplete, merge fetched partitionIds with partition group metadata " diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index bc49b8d13e5d..380b358a84ed 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -282,6 +282,12 @@ public void deleteSegmentFile() { private static final int MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS = 31; private Thread _consumerThread; + // _partitionGroupId represents the Pinot's internal partition number which will eventually be used as part of + // segment name. + // _streamPatitionGroupId represents the partition number in the stream topic, which could be derived from the + // _partitionGroupId and identify which partition of the stream topic this consumer is consuming from. + // Note that in traditional single topic ingestion mode, those two concepts were identical which got separated + // in multi-topic ingestion mode. private final int _partitionGroupId; private final int _streamPatitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index e845cf018781..72a17ee7d1c6 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -684,7 +684,7 @@ public void ingestionStreamConfigsTest() { new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") .setIngestionConfig(ingestionConfig).build(); - // Multiple stream configs is allowed + // Multiple stream configs are allowed try { TableConfigUtils.validateIngestionConfig(tableConfig, null); } catch (IllegalStateException e) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadException.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadException.java new file mode 100644 index 000000000000..a938cfb4997a --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadException.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class PredownloadException { +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadMetrics.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadMetrics.java new file mode 100644 index 000000000000..8a62c178f6d9 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadMetrics.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class PredownloadMetrics { +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadScheduler.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadScheduler.java new file mode 100644 index 000000000000..944d9bc3c770 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadScheduler.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class PredownloadScheduler { +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/SegmentInfo.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/SegmentInfo.java new file mode 100644 index 000000000000..ae5f8b5c43f8 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/SegmentInfo.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class SegmentInfo { +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/StatusRecorder.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/StatusRecorder.java new file mode 100644 index 000000000000..68b4a4857e5d --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/StatusRecorder.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class StatusRecorder { +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/TableInfo.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/TableInfo.java new file mode 100644 index 000000000000..270e7cb0bd68 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/TableInfo.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class TableInfo { +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/ZKClient.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/ZKClient.java new file mode 100644 index 000000000000..a72f9c7e3758 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/ZKClient.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class ZKClient { +} diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/StatusRecorderTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/StatusRecorderTest.java new file mode 100644 index 000000000000..0d7204b2d484 --- /dev/null +++ b/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/StatusRecorderTest.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class StatusRecorderTest { +} diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/TestUtil.java b/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/TestUtil.java new file mode 100644 index 000000000000..a7c2c2bbca4b --- /dev/null +++ b/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/TestUtil.java @@ -0,0 +1,2 @@ +package org.apache.pinot.tools.predownload;public class TestUtil { +} From 34bdc12f16a396b8989098b62c898ac758458e4e Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Mon, 9 Dec 2024 13:02:39 -0800 Subject: [PATCH 08/12] Edit doc/comment --- .../java/org/apache/pinot/spi/utils/IngestionConfigUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 6df8a0ad3eb2..81e2d9655a4b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -48,7 +48,7 @@ private IngestionConfigUtils() { private static final int DEFAULT_PUSH_ATTEMPTS = 5; private static final int DEFAULT_PUSH_PARALLELISM = 1; private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L; - // For partition from different tables, we pad then with an offset to avoid collision. The offset is far higher + // For partition from different topics, we pad then with an offset to avoid collision. The offset is far higher // than the normal max number of partitions on stream (e.g. 512). public static final int PARTITION_PADDING_OFFSET = 10000; public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING = From b0b433c300b25be4d340c83d2c5d3856b41ec5b2 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Mon, 9 Dec 2024 13:16:46 -0800 Subject: [PATCH 09/12] Remove unrelated files --- .../apache/pinot/tools/predownload/PredownloadException.java | 2 -- .../org/apache/pinot/tools/predownload/PredownloadMetrics.java | 2 -- .../apache/pinot/tools/predownload/PredownloadScheduler.java | 2 -- .../java/org/apache/pinot/tools/predownload/SegmentInfo.java | 2 -- .../java/org/apache/pinot/tools/predownload/StatusRecorder.java | 2 -- .../main/java/org/apache/pinot/tools/predownload/TableInfo.java | 2 -- .../main/java/org/apache/pinot/tools/predownload/ZKClient.java | 2 -- .../org/apache/pinot/tools/predownload/StatusRecorderTest.java | 2 -- .../test/java/org/apache/pinot/tools/predownload/TestUtil.java | 2 -- 9 files changed, 18 deletions(-) delete mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadException.java delete mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadMetrics.java delete mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadScheduler.java delete mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/SegmentInfo.java delete mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/StatusRecorder.java delete mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/TableInfo.java delete mode 100644 pinot-tools/src/main/java/org/apache/pinot/tools/predownload/ZKClient.java delete mode 100644 pinot-tools/src/test/java/org/apache/pinot/tools/predownload/StatusRecorderTest.java delete mode 100644 pinot-tools/src/test/java/org/apache/pinot/tools/predownload/TestUtil.java diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadException.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadException.java deleted file mode 100644 index a938cfb4997a..000000000000 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadException.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class PredownloadException { -} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadMetrics.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadMetrics.java deleted file mode 100644 index 8a62c178f6d9..000000000000 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadMetrics.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class PredownloadMetrics { -} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadScheduler.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadScheduler.java deleted file mode 100644 index 944d9bc3c770..000000000000 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/PredownloadScheduler.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class PredownloadScheduler { -} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/SegmentInfo.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/SegmentInfo.java deleted file mode 100644 index ae5f8b5c43f8..000000000000 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/SegmentInfo.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class SegmentInfo { -} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/StatusRecorder.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/StatusRecorder.java deleted file mode 100644 index 68b4a4857e5d..000000000000 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/StatusRecorder.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class StatusRecorder { -} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/TableInfo.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/TableInfo.java deleted file mode 100644 index 270e7cb0bd68..000000000000 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/TableInfo.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class TableInfo { -} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/ZKClient.java b/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/ZKClient.java deleted file mode 100644 index a72f9c7e3758..000000000000 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/predownload/ZKClient.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class ZKClient { -} diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/StatusRecorderTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/StatusRecorderTest.java deleted file mode 100644 index 0d7204b2d484..000000000000 --- a/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/StatusRecorderTest.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class StatusRecorderTest { -} diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/TestUtil.java b/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/TestUtil.java deleted file mode 100644 index a7c2c2bbca4b..000000000000 --- a/pinot-tools/src/test/java/org/apache/pinot/tools/predownload/TestUtil.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.pinot.tools.predownload;public class TestUtil { -} From ad34f173c071e8def4ecb992bdc817d4e3d25e43 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Tue, 17 Dec 2024 14:11:14 -0800 Subject: [PATCH 10/12] Rebase and resolve conflicts --- .../helix/core/PinotTableIdealStateBuilder.java | 4 ---- .../stream/PartitionGroupMetadataFetcher.java | 17 ----------------- 2 files changed, 21 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 94b98e1f14b7..8895d9df50a4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -80,11 +80,7 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n * the collection of shards in partition group 1, should remain unchanged in the response, * whereas shards 3,4 can be added to new partition groups if needed. * -<<<<<<< HEAD * @param streamConfigs the List of streamConfig from the tableConfig -======= - * @param streamConfigs the streamConfigs from the tableConfig ->>>>>>> cae4dc5126 (Resolve comments) * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current * partition groups. * The size of this list is equal to the number of partition groups, diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index e4bd6df12b72..0f8a62f0fba2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -41,17 +41,10 @@ public class PartitionGroupMetadataFetcher implements Callable { private Exception _exception; private final List _topicNames; -<<<<<<< HEAD - public PartitionGroupMetadataFetcher(StreamConfig streamConfig, - List partitionGroupConsumptionStatusList) { - _topicNames = Arrays.asList(streamConfig.getTopicName()); - _streamConfigs = Arrays.asList(streamConfig); -======= public PartitionGroupMetadataFetcher(List streamConfigs, List partitionGroupConsumptionStatusList) { _topicNames = streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList()); _streamConfigs = streamConfigs; ->>>>>>> cae4dc5126 (Resolve comments) _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; _newPartitionGroupMetadataList = new ArrayList<>(); } @@ -89,20 +82,10 @@ public Boolean call() streamConsumerFactory.createStreamMetadataProvider(clientId)) { _newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfigs.get(i), -<<<<<<< HEAD - _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( -======= topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/5000).stream().map( -<<<<<<< HEAD ->>>>>>> ca24d4bf7b (Fix issues, rebase and resolve comments) - metadata -> new PartitionGroupMetadata( - IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( - metadata.getPartitionGroupId(), index), -======= metadata -> new PartitionGroupMetadata( IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( metadata.getPartitionGroupId(), index), ->>>>>>> 1c346671d0 (Fix style) metadata.getStartOffset())).collect(Collectors.toList()) ); if (_exception != null) { From ab7b85bac062001fe65f392e2218069f28ebe657 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Tue, 17 Dec 2024 14:16:38 -0800 Subject: [PATCH 11/12] Take the metadata fetch time change from the HEAD --- .../apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 0f8a62f0fba2..158e28ce728c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -82,7 +82,7 @@ public Boolean call() streamConsumerFactory.createStreamMetadataProvider(clientId)) { _newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfigs.get(i), - topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/5000).stream().map( + topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( metadata -> new PartitionGroupMetadata( IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( metadata.getPartitionGroupId(), index), From d8f46da1757f7c5425257ad8d9fd9b8ec651f708 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Tue, 17 Dec 2024 14:31:15 -0800 Subject: [PATCH 12/12] Resolve conflicts --- .../helix/core/realtime/SegmentCompletionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 5cae81292449..5bb3f861d7b0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -131,7 +131,7 @@ private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String msg TableConfig tableConfig = _segmentManager.getTableConfig(realtimeTableName); String factoryName = null; try { - Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); factoryName = streamConfigMap.get(StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME); } catch (Exception e) { // If there is an exception, we default to the default factory.