diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 342413d3559f..0326f97a7bca 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -257,7 +257,7 @@ public void init(PinotConfiguration pinotConfiguration) // This executor service is used to do async tasks from multiget util or table rebalancing. _executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d"); _tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(), - "tenant-rebalance-thread-%d"); + "tenant-rebalance-thread-%d"); _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); } @@ -272,7 +272,7 @@ public void init(PinotConfiguration pinotConfiguration) private ExecutorService createExecutorService(int numThreadPool, String threadNameFormat) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(); return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory) - : Executors.newFixedThreadPool(numThreadPool, threadFactory); + : Executors.newFixedThreadPool(numThreadPool, threadFactory); } private void inferHostnameIfNeeded(ControllerConf config) { @@ -577,10 +577,12 @@ protected void configure() { _helixResourceManager.getAllRealtimeTables().forEach(rt -> { TableConfig tableConfig = _helixResourceManager.getTableConfig(rt); if (tableConfig != null) { - Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + List> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); try { - StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), - streamConfigMap); + for (Map streamConfigMap : streamConfigMaps) { + StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), + streamConfigMap); + } } catch (Exception e) { existingHlcTables.add(rt); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index c9a48022c0be..1a5f542dd798 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -419,10 +420,11 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon numInvalidEndTime); if (tableType == TableType.REALTIME && tableConfig != null) { - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, - streamConfig).findAndEmitMetrics(idealState); + streamConfigs).findAndEmitMetrics(idealState); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 23a115417f8b..8895d9df50a4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -54,6 +54,7 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n /** * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. + * In particular, this method can also be used to fetch from multiple stream topics. * * Reasons why partitionGroupConsumptionStatusList is needed: * @@ -79,23 +80,24 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n * the collection of shards in partition group 1, should remain unchanged in the response, * whereas shards 3,4 can be added to new partition groups if needed. * - * @param streamConfig the streamConfig from the tableConfig + * @param streamConfigs the List of streamConfig from the tableConfig * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current * partition groups. * The size of this list is equal to the number of partition groups, * and is created using the latest segment zk metadata. */ - public static List getPartitionGroupMetadataList(StreamConfig streamConfig, + public static List getPartitionGroupMetadataList(List streamConfigs, List partitionGroupConsumptionStatusList) { PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); + new PartitionGroupMetadataFetcher(streamConfigs, partitionGroupConsumptionStatusList); try { DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); } catch (Exception e) { Exception fetcherException = partitionGroupMetadataFetcher.getException(); - LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), - streamConfig.getTableNameWithType(), fetcherException); + LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", + streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).reduce((a, b) -> a + "," + b), + streamConfigs.get(0).getTableNameWithType(), fetcherException); throw new RuntimeException(fetcherException); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index f4192a5a1a71..5fe2ffe6d6e9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -24,7 +24,9 @@ import java.time.Instant; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.helix.AccessOption; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -65,25 +67,29 @@ public class MissingConsumingSegmentFinder { private ControllerMetrics _controllerMetrics; public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore propertyStore, - ControllerMetrics controllerMetrics, StreamConfig streamConfig) { + ControllerMetrics controllerMetrics, List streamConfigs) { _realtimeTableName = realtimeTableName; _controllerMetrics = controllerMetrics; _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); _streamPartitionMsgOffsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); // create partition group id to largest stream offset map _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>(); - streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); + streamConfigs.stream().map(streamConfig -> { + streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); + return streamConfig; + }); try { - PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) + PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList()) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); } catch (Exception e) { - LOGGER.warn("Problem encountered in fetching stream metadata for topic: {} of table: {}. " + LOGGER.warn("Problem encountered in fetching stream metadata for topics: {} of table: {}. " + "Continue finding missing consuming segment only with ideal state information.", - streamConfig.getTopicName(), streamConfig.getTableNameWithType()); + streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).collect(Collectors.toList()), + streamConfigs.get(0).getTableNameWithType()); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 56c0e8f5f0ae..cdc281603c2f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -232,7 +232,7 @@ FileUploadDownloadClient initFileUploadDownloadClient() { * for latest segment of each partition group. */ public List getPartitionGroupConsumptionStatusList(IdealState idealState, - StreamConfig streamConfig) { + List streamConfigs) { List partitionGroupConsumptionStatusList = new ArrayList<>(); // From all segment names in the ideal state, find unique partition group ids and their latest segment @@ -257,12 +257,12 @@ public List getPartitionGroupConsumptionStatusL // Create a {@link PartitionGroupConsumptionStatus} for each latest segment StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); for (Map.Entry entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); LLCSegmentName llcSegmentName = entry.getValue(); SegmentZKMetadata segmentZKMetadata = - getSegmentZKMetadata(streamConfig.getTableNameWithType(), llcSegmentName.getSegmentName()); + getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), llcSegmentName.getSegmentName()); PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(partitionGroupId, llcSegmentName.getSequenceNumber(), offsetFactory.create(segmentZKMetadata.getStartOffset()), @@ -322,11 +322,12 @@ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName); - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); int numPartitionGroups = newPartitionGroupMetadataList.size(); int numReplicas = getNumReplicas(tableConfig, instancePartitions); @@ -339,7 +340,8 @@ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { Map> instanceStatesMap = idealState.getRecord().getMapFields(); for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { String segmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, + setupNewPartitionGroup(tableConfig, streamConfigs.get(0), partitionGroupMetadata, currentTimeMs, + instancePartitions, numPartitionGroups, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, instancePartitionsMap); @@ -548,29 +550,16 @@ private void commitSegmentMetadataInternal(String realtimeTableName, long startTimeNs2 = System.nanoTime(); String newConsumingSegmentName = null; if (!isTablePaused(idealState)) { - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); - Set partitionIds; - try { - partitionIds = getPartitionIds(streamConfig); - } catch (Exception e) { - LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. " - + "Reading all partition group metadata to determine partition ids.", realtimeTableName, e.toString()); - // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed. - // We don't need to read partition group metadata for other partition groups. - List currentPartitionGroupConsumptionStatusList = - getPartitionGroupConsumptionStatusList(idealState, streamConfig); - List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); - partitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId) - .collect(Collectors.toSet()); - } + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); + Set partitionIds = getPartitionIds(streamConfigs, idealState); if (partitionIds.contains(committingSegmentPartitionGroupId)) { String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, + createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegment, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), numReplicas); newConsumingSegmentName = newLLCSegment.getSegmentName(); @@ -763,7 +752,7 @@ public long getCommitTimeoutMS(String realtimeTableName) { return commitTimeoutMS; } TableConfig tableConfig = getTableConfig(realtimeTableName); - final Map streamConfigs = IngestionConfigUtils.getStreamConfigMap(tableConfig); + final Map streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); if (streamConfigs.containsKey(StreamConfigProperties.SEGMENT_COMMIT_TIMEOUT_SECONDS)) { final String commitTimeoutSecondsStr = streamConfigs.get(StreamConfigProperties.SEGMENT_COMMIT_TIMEOUT_SECONDS); try { @@ -792,15 +781,49 @@ Set getPartitionIds(StreamConfig streamConfig) } } + @VisibleForTesting + Set getPartitionIds(List streamConfigs, IdealState idealState) { + Set partitionIds = new HashSet<>(); + boolean allPartitionIdsFetched = true; + for (int i = 0; i < streamConfigs.size(); i++) { + final int index = i; + try { + partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream() + .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) + .collect(Collectors.toSet())); + } catch (Exception e) { + allPartitionIdsFetched = false; + LOGGER.warn("Failed to fetch partition ids for stream: {}", streamConfigs.get(i).getTopicName(), e); + } + } + + // If it is failing to fetch partition ids from stream (usually transient due to stream metadata service outage), + // we need to use the existing partition information from ideal state to keep same ingestion behavior. + if (!allPartitionIdsFetched) { + LOGGER.info( + "Fetch partition ids from Stream incomplete, merge fetched partitionIds with partition group metadata " + + "for: {}", idealState.getId()); + // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed. + // We don't need to read partition group metadata for other partition groups. + List currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + List newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + partitionIds.addAll(newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId) + .collect(Collectors.toSet())); + } + return partitionIds; + } + /** * Fetches the latest state of the PartitionGroups for the stream * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, * it will be skipped from the result */ @VisibleForTesting - List getNewPartitionGroupMetadataList(StreamConfig streamConfig, + List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList) { - return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, + return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); } @@ -917,7 +940,7 @@ private Map getLatestSegmentZKMetadataMap(String rea * IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE. * If so, it should create a new CONSUMING segment for the partition. */ - public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, + public void ensureAllPartitionsConsuming(TableConfig tableConfig, List streamConfigs, OffsetCriteria offsetCriteria) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); @@ -931,15 +954,16 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s List currentPartitionGroupConsumptionStatusList = offsetsHaveToChange ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfig); - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + // FIXME: Right now, we assume topics are sharing same offset criteria + OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); // Read the smallest offset when a new partition is detected - streamConfig.setOffsetCriteria( - offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(offsetsHaveToChange + ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); - streamConfig.setOffsetCriteria(originalOffsetCriteria); - return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); + return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, offsetCriteria); } else { LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", @@ -1159,8 +1183,8 @@ private boolean isAllInstancesInState(Map instanceStateMap, Stri * TODO: split this method into multiple smaller methods */ @VisibleForTesting - IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, IdealState idealState, - List partitionGroupMetadataList, OffsetCriteria offsetCriteria) { + IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List streamConfigs, + IdealState idealState, List partitionGroupMetadataList, OffsetCriteria offsetCriteria) { String realtimeTableName = tableConfig.getTableName(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); @@ -1174,7 +1198,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st Map> instanceStatesMap = idealState.getRecord().getMapFields(); StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); // Get the latest segment ZK metadata for each partition Map latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName); @@ -1239,7 +1263,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, + createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, segmentAssignment, instancePartitionsMap); @@ -1273,7 +1297,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st // Smallest offset is fetched from stream once and cached in partitionIdToSmallestOffset. if (partitionIdToSmallestOffset == null) { - partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig); + partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs); } // Do not create new CONSUMING segment when the stream partition has reached end of life. @@ -1287,7 +1311,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning - createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs, partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset); } else { @@ -1296,7 +1320,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset()); - createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs, partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset); } @@ -1343,7 +1367,8 @@ && new LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == partitionI int partitionId = partitionGroupMetadata.getPartitionGroupId(); if (!latestSegmentZKMetadataMap.containsKey(partitionId)) { String newSegmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, + setupNewPartitionGroup(tableConfig, streamConfigs.get(0), partitionGroupMetadata, currentTimeMs, + instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap); @@ -1371,15 +1396,18 @@ private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig str instancePartitionsMap); } - private Map fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) { - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); - streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); - List partitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); - streamConfig.setOffsetCriteria(originalOffsetCriteria); + private Map fetchPartitionGroupIdToSmallestOffset( + List streamConfigs) { Map partitionGroupIdToSmallestOffset = new HashMap<>(); - for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { - partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + for (StreamConfig streamConfig : streamConfigs) { + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List partitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { + partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + } } return partitionGroupIdToSmallestOffset; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 63d302f92996..5bb3f861d7b0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -102,7 +102,7 @@ protected StreamPartitionMsgOffsetFactory getStreamPartitionMsgOffsetFactory(LLC String rawTableName = llcSegmentName.getTableName(); TableConfig tableConfig = _segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)); StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0)); return StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); } @@ -131,7 +131,7 @@ private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String msg TableConfig tableConfig = _segmentManager.getTableConfig(realtimeTableName); String factoryName = null; try { - Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); factoryName = streamConfigMap.get(StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME); } catch (Exception e) { // If there is an exception, we default to the default factory. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 88f1bc6ee692..dbe229ebc9da 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -104,14 +105,15 @@ protected void processTable(String tableNameWithType, Context context) { LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType); return; } - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); if (context._runSegmentLevelValidation) { - runSegmentLevelValidation(tableConfig, streamConfig); + runSegmentLevelValidation(tableConfig); } if (shouldEnsureConsuming(tableNameWithType)) { - _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, context._offsetCriteria); + _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfigs, context._offsetCriteria); } } @@ -147,7 +149,7 @@ private boolean shouldEnsureConsuming(String tableNameWithType) { return !isQuotaExceeded; } - private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) { + private void runSegmentLevelValidation(TableConfig tableConfig) { String realtimeTableName = tableConfig.getTableName(); List segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 6fa6518a3d2d..974e01bf95c1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -91,8 +91,8 @@ import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS; import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; import static org.testng.Assert.*; @@ -273,7 +273,7 @@ public void testCommitSegment() { // committing segment's partitionGroupId no longer in the newPartitionGroupMetadataList List partitionGroupMetadataListWithout0 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList()); + segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList()); partitionGroupMetadataListWithout0.remove(0); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout0; @@ -592,7 +592,7 @@ public void testRepairs() { */ // 1 reached end of shard. List partitionGroupMetadataListWithout1 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList()); + segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList()); partitionGroupMetadataListWithout1.remove(1); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout1; // noop @@ -879,7 +879,7 @@ public void testStopSegmentManager() // Expected } try { - segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, null); + segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfigs, null); fail(); } catch (IllegalStateException e) { // Expected @@ -1214,6 +1214,36 @@ public void testDeleteTmpSegmentFiles() assertEquals(numDeletedTmpSegments, 1); } + @Test + public void testGetPartitionIds() + throws Exception { + List streamConfigs = List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs()); + IdealState idealState = new IdealState("table"); + FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(); + segmentManager._numPartitions = 2; + + // Test empty ideal state + Set partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState); + Assert.assertEquals(partitionIds.size(), 2); + partitionIds.clear(); + + // Simulate the case where getPartitionIds(StreamConfig) throws an exception (e.g. transient kafka connection issue) + PinotLLCRealtimeSegmentManager segmentManagerSpy = spy(FakePinotLLCRealtimeSegmentManager.class); + doThrow(new RuntimeException()).when(segmentManagerSpy).getPartitionIds(any(StreamConfig.class)); + List partitionGroupConsumptionStatusList = + List.of(new PartitionGroupConsumptionStatus(0, 12, new LongMsgOffset(123), new LongMsgOffset(234), "ONLINE"), + new PartitionGroupConsumptionStatus(1, 12, new LongMsgOffset(123), new LongMsgOffset(345), "ONLINE")); + doReturn(partitionGroupConsumptionStatusList).when(segmentManagerSpy) + .getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + List partitionGroupMetadataList = + List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)), + new PartitionGroupMetadata(1, new LongMsgOffset(345))); + doReturn(partitionGroupMetadataList).when(segmentManagerSpy) + .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList); + partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState); + Assert.assertEquals(partitionIds.size(), 2); + } + ////////////////////////////////////////////////////////////////////////////////// // Fake classes ///////////////////////////////////////////////////////////////////////////////// @@ -1227,7 +1257,7 @@ private static class FakePinotLLCRealtimeSegmentManager extends PinotLLCRealtime int _numReplicas; TableConfig _tableConfig; - StreamConfig _streamConfig; + List _streamConfigs; int _numInstances; InstancePartitions _consumingInstancePartitions; Map _segmentZKMetadataMap = new HashMap<>(); @@ -1255,8 +1285,8 @@ void makeTableConfig() { _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas) .setStreamConfigs(streamConfigs).build(); - _streamConfig = - new StreamConfig(_tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + _streamConfigs = IngestionConfigUtils.getStreamConfigMaps(_tableConfig).stream().map( + streamConfig -> new StreamConfig(_tableConfig.getTableName(), streamConfig)).collect(Collectors.toList()); } void makeConsumingInstancePartitions() { @@ -1274,8 +1304,8 @@ public void setUpNewTable() { } public void ensureAllPartitionsConsuming() { - ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, - getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), null); + ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState, + getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList()), null); } @Override @@ -1355,7 +1385,7 @@ Set getPartitionIds(StreamConfig streamConfig) { } @Override - List getNewPartitionGroupMetadataList(StreamConfig streamConfig, + List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList) { if (_partitionGroupMetadataList != null) { return _partitionGroupMetadataList; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java index fff62329439a..36caa5b86aa3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java @@ -73,7 +73,7 @@ public TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable E } break; case REALTIME: - Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); if (Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE)) && StringUtils.isEmpty(_instanceDataManagerConfig.getSegmentStoreUri())) { throw new IllegalStateException(String.format("Table has enabled %s config. But the server has not " diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 684e1ffa531c..380b358a84ed 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -282,7 +282,14 @@ public void deleteSegmentFile() { private static final int MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS = 31; private Thread _consumerThread; + // _partitionGroupId represents the Pinot's internal partition number which will eventually be used as part of + // segment name. + // _streamPatitionGroupId represents the partition number in the stream topic, which could be derived from the + // _partitionGroupId and identify which partition of the stream topic this consumer is consuming from. + // Note that in traditional single topic ingestion mode, those two concepts were identical which got separated + // in multi-topic ingestion mode. private final int _partitionGroupId; + private final int _streamPatitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; private final TransformPipeline _transformPipeline; @@ -1496,12 +1503,16 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); // TODO Validate configs IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); - _streamConfig = new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + _partitionGroupId = llcSegmentName.getPartitionGroupId(); + _streamPatitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId); + _streamConfig = new StreamConfig( + _tableNameWithType, + IngestionConfigUtils.getStreamConfigMaps(_tableConfig) + .get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId))); _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); _streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory(); String streamTopic = _streamConfig.getTopicName(); _segmentNameStr = _segmentZKMetadata.getSegmentName(); - _partitionGroupId = llcSegmentName.getPartitionGroupId(); _partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(_partitionGroupId, llcSegmentName.getSequenceNumber(), _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()), @@ -1514,9 +1525,9 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf String clientIdSuffix = instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null; if (StringUtils.isNotBlank(clientIdSuffix)) { - _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId + "-" + clientIdSuffix; + _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId + "-" + clientIdSuffix; } else { - _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId; + _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId; } _segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + streamTopic; @@ -1832,7 +1843,8 @@ private void recreateStreamConsumer(String reason) { private void createPartitionMetadataProvider(String reason) { closePartitionMetadataProvider(); _segmentLogger.info("Creating new partition metadata provider, reason: {}", reason); - _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId); + _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider( + _clientId, _streamPatitionGroupId); } private void updateIngestionMetrics(RowMetadata metadata) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 33a3b55654b2..4224019ab0e1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -47,7 +47,7 @@ public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProt _protocolHandler = protocolHandler; _tableConfig = tableConfig; _streamConfig = new StreamConfig(_tableConfig.getTableName(), - IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + IngestionConfigUtils.getStreamConfigMaps(_tableConfig).get(0)); _indexLoadingConfig = indexLoadingConfig; _serverMetrics = serverMetrics; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 387f69a44269..141e0c280a93 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -169,15 +169,22 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N // Only allow realtime tables with non-null stream.type and LLC consumer.type if (tableConfig.getTableType() == TableType.REALTIME) { - Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + List> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); + if (streamConfigMaps.size() > 1) { + Preconditions.checkArgument(!tableConfig.isUpsertEnabled(), + "Multiple stream configs are not supported for upsert tables"); + } + // TODO: validate stream configs in the map are identical in most fields StreamConfig streamConfig; - try { - // Validate that StreamConfig can be created - streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigMap); - } catch (Exception e) { - throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); + for (Map streamConfigMap : streamConfigMaps) { + try { + // Validate that StreamConfig can be created + streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigMap); + } catch (Exception e) { + throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); + } + validateStreamConfig(streamConfig); } - validateStreamConfig(streamConfig); } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfig(tableConfig.getIndexingConfig(), schema); @@ -390,7 +397,8 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc Preconditions.checkState(indexingConfig == null || MapUtils.isEmpty(indexingConfig.getStreamConfigs()), "Should not use indexingConfig#getStreamConfigs if ingestionConfig#StreamIngestionConfig is provided"); List> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); - Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream is supported in REALTIME table"); + Preconditions.checkState(streamConfigMaps.size() > 0, "Must have at least 1 stream in REALTIME table"); + // TODO: for multiple stream configs, validate them } // Filter config diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 98b0ba552c18..72a17ee7d1c6 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -684,12 +684,11 @@ public void ingestionStreamConfigsTest() { new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") .setIngestionConfig(ingestionConfig).build(); - // only 1 stream config allowed + // Multiple stream configs are allowed try { TableConfigUtils.validateIngestionConfig(tableConfig, null); - Assert.fail("Should fail for more than 1 stream config"); } catch (IllegalStateException e) { - // expected + Assert.fail("Multiple stream configs should be supported"); } // stream config should be valid diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java index d519a2302917..bc02df8462dd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.spi.stream; +import org.apache.pinot.spi.utils.IngestionConfigUtils; + + /** * A PartitionGroup is a group of partitions/shards that the same consumer should consume from. * This class contains all information which describes the latest state of a partition group. @@ -36,6 +39,7 @@ public class PartitionGroupConsumptionStatus { private final int _partitionGroupId; + private final int _streamPartitionGroupId; private int _sequenceNumber; private StreamPartitionMsgOffset _startOffset; private StreamPartitionMsgOffset _endOffset; @@ -44,6 +48,7 @@ public class PartitionGroupConsumptionStatus { public PartitionGroupConsumptionStatus(int partitionGroupId, int sequenceNumber, StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, String status) { _partitionGroupId = partitionGroupId; + _streamPartitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); _sequenceNumber = sequenceNumber; _startOffset = startOffset; _endOffset = endOffset; @@ -54,6 +59,10 @@ public int getPartitionGroupId() { return _partitionGroupId; } + public int getStreamPartitionGroupId() { + return _streamPartitionGroupId; + } + public int getSequenceNumber() { return _sequenceNumber; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 98094b9e88cb..158e28ce728c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -18,33 +18,35 @@ */ package org.apache.pinot.spi.stream; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the stream, + * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the streams, * using the {@link StreamMetadataProvider} */ public class PartitionGroupMetadataFetcher implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); - private List _newPartitionGroupMetadataList; - private final StreamConfig _streamConfig; + private final List _newPartitionGroupMetadataList; + private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; - private final StreamConsumerFactory _streamConsumerFactory; private Exception _exception; - private final String _topicName; + private final List _topicNames; - public PartitionGroupMetadataFetcher(StreamConfig streamConfig, + public PartitionGroupMetadataFetcher(List streamConfigs, List partitionGroupConsumptionStatusList) { - _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - _topicName = streamConfig.getTopicName(); - _streamConfig = streamConfig; + _topicNames = streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList()); + _streamConfigs = streamConfigs; _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; + _newPartitionGroupMetadataList = new ArrayList<>(); } public List getPartitionGroupMetadataList() { @@ -63,25 +65,43 @@ public Exception getException() { @Override public Boolean call() throws Exception { - String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" - + _streamConfig.getTableNameWithType() + "-" + _topicName; - try ( - StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) { - _newPartitionGroupMetadataList = streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfig, - _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000); - if (_exception != null) { - // We had at least one failure, but succeeded now. Log an info - LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", _topicName); + _newPartitionGroupMetadataList.clear(); + for (int i = 0; i < _streamConfigs.size(); i++) { + String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + + _streamConfigs.get(i).getTableNameWithType() + "-" + _topicNames.get(i); + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfigs.get(i)); + final int index = i; + List topicPartitionGroupConsumptionStatusList = + _partitionGroupConsumptionStatusList.stream() + .filter(partitionGroupConsumptionStatus -> + IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId( + partitionGroupConsumptionStatus.getPartitionGroupId()) == index) + .collect(Collectors.toList()); + try ( + StreamMetadataProvider streamMetadataProvider = + streamConsumerFactory.createStreamMetadataProvider(clientId)) { + _newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId, + _streamConfigs.get(i), + topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( + metadata -> new PartitionGroupMetadata( + IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( + metadata.getPartitionGroupId(), index), + metadata.getStartOffset())).collect(Collectors.toList()) + ); + if (_exception != null) { + // We had at least one failure, but succeeded now. Log an info + LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", _topicNames.get(i)); + } + } catch (TransientConsumerException e) { + LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicNames.get(i), e); + _exception = e; + return Boolean.FALSE; + } catch (Exception e) { + LOGGER.warn("Could not get partition count for topic {}", _topicNames.get(i), e); + _exception = e; + throw e; } - return Boolean.TRUE; - } catch (TransientConsumerException e) { - LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicName, e); - _exception = e; - return Boolean.FALSE; - } catch (Exception e) { - LOGGER.warn("Could not get partition count for topic {}", _topicName, e); - _exception = e; - throw e; } + return Boolean.TRUE; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index 39d061473e35..e52610dd6771 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -223,7 +223,7 @@ public Boolean isServerUploadToDeepStore() { return _serverUploadToDeepStore; } - private double extractFlushThresholdVarianceFraction(Map streamConfigMap) { + public static double extractFlushThresholdVarianceFraction(Map streamConfigMap) { String key = StreamConfigProperties.FLUSH_THRESHOLD_VARIANCE_FRACTION; String flushThresholdVarianceFractionStr = streamConfigMap.get(key); if (flushThresholdVarianceFractionStr != null) { @@ -245,7 +245,7 @@ private double extractFlushThresholdVarianceFraction(Map streamC } } - private long extractFlushThresholdSegmentSize(Map streamConfigMap) { + public static long extractFlushThresholdSegmentSize(Map streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE; String flushThresholdSegmentSizeStr = streamConfigMap.get(key); if (flushThresholdSegmentSizeStr == null) { @@ -264,7 +264,7 @@ private long extractFlushThresholdSegmentSize(Map streamConfigMa } } - protected int extractFlushThresholdRows(Map streamConfigMap) { + public static int extractFlushThresholdRows(Map streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS; String flushThresholdRowsStr = streamConfigMap.get(key); if (flushThresholdRowsStr == null) { @@ -288,7 +288,7 @@ protected int extractFlushThresholdRows(Map streamConfigMap) { } } - protected int extractFlushThresholdSegmentRows(Map streamConfigMap) { + public static int extractFlushThresholdSegmentRows(Map streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_ROWS; String flushThresholdSegmentRowsStr = streamConfigMap.get(key); if (flushThresholdSegmentRowsStr != null) { @@ -302,7 +302,7 @@ protected int extractFlushThresholdSegmentRows(Map streamConfigM } } - protected long extractFlushThresholdTimeMillis(Map streamConfigMap) { + public static long extractFlushThresholdTimeMillis(Map streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME; String flushThresholdTimeStr = streamConfigMap.get(key); if (flushThresholdTimeStr == null) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index 812b7b8e0f92..a8c4d22cc32a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -59,7 +59,7 @@ public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() { */ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { - return createPartitionLevelConsumer(clientId, partitionGroupConsumptionStatus.getPartitionGroupId()); + return createPartitionLevelConsumer(clientId, partitionGroupConsumptionStatus.getStreamPartitionGroupId()); } @Deprecated diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 85bb2801a1f6..052993a6d0fb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -81,7 +81,7 @@ default List computePartitionGroupMetadata(String client // If partition group is still in progress, this value will be null for (PartitionGroupConsumptionStatus currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) { newPartitionGroupMetadataList.add( - new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getPartitionGroupId(), + new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getStreamPartitionGroupId(), currentPartitionGroupConsumptionStatus.getEndOffset())); } // Add PartitionGroupMetadata for new partitions diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 2aeba4160bf4..81e2d9655a4b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.utils; import com.google.common.base.Preconditions; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.stream.StreamConfig; /** @@ -46,15 +48,100 @@ private IngestionConfigUtils() { private static final int DEFAULT_PUSH_ATTEMPTS = 5; private static final int DEFAULT_PUSH_PARALLELISM = 1; private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L; + // For partition from different topics, we pad then with an offset to avoid collision. The offset is far higher + // than the normal max number of partitions on stream (e.g. 512). + public static final int PARTITION_PADDING_OFFSET = 10000; + public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING = + "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory"; + public static final String STREAM_TYPE = "streamType"; + public static final String STREAM_CONSUMER_FACTORY_CLASS = "stream.consumer.factory.class"; /** * Fetches the streamConfig from the given realtime table. * First, the ingestionConfigs->stream->streamConfigs will be checked. * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). * @param tableConfig realtime table config - * @return streamConfigs map + * @return streamConfigs List of maps */ - public static Map getStreamConfigMap(TableConfig tableConfig) { + public static List> getStreamConfigMaps(TableConfig tableConfig) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + Preconditions.checkState(!streamConfigMaps.isEmpty(), "Table must have at least 1 stream"); + /* + Apply the following checks if there are multiple streamConfigs + 1. Check if all streamConfigs have the same stream type. TODO: remove this limitation once we've tested it + 2. Ensure segment flush parameters consistent across all streamConfigs. We need this because Pinot is predefining + the values before fetching stream partition info from stream. At the construction time, we don't know the value + extracted from a streamConfig would be applied to which segment. + TODO: remove this limitation once we've refactored the code and supported it. + */ + Map firstStreamConfigMap = streamConfigMaps.get(0); + for (int i = 1; i < streamConfigMaps.size(); i++) { + Map map = streamConfigMaps.get(i); + Preconditions.checkNotNull(map.get(STREAM_TYPE), + "streamType must be defined for all streamConfigs for REALTIME table: %s", tableNameWithType); + Preconditions.checkState(StringUtils.equals(map.get(STREAM_TYPE), firstStreamConfigMap.get(STREAM_TYPE)) + && StreamConfig.extractFlushThresholdRows(map) == StreamConfig.extractFlushThresholdRows( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdTimeMillis(map) == StreamConfig.extractFlushThresholdTimeMillis( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdVarianceFraction(map) + == StreamConfig.extractFlushThresholdVarianceFraction(firstStreamConfigMap) + && StreamConfig.extractFlushThresholdSegmentSize(map) == StreamConfig.extractFlushThresholdSegmentSize( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdSegmentRows(map) == StreamConfig.extractFlushThresholdSegmentRows( + firstStreamConfigMap), + "All streamConfigs must have the same stream type for REALTIME table: %s", tableNameWithType); + } + return streamConfigMaps; + } + if (tableConfig.getIndexingConfig() != null && tableConfig.getIndexingConfig().getStreamConfigs() != null) { + return Arrays.asList(tableConfig.getIndexingConfig().getStreamConfigs()); + } + throw new IllegalStateException("Could not find streamConfigs for REALTIME table: " + tableNameWithType); + } + + /** + * Getting the Pinot segment level partition id from the stream partition id. + * @param partitionId the partition group id from the stream + * @param index the index of the SteamConfig from the list of StreamConfigs + * @return + */ + public static int getPinotPartitionIdFromStreamPartitionId(int partitionId, int index) { + return index * PARTITION_PADDING_OFFSET + partitionId; + } + + /** + * Getting the Stream partition id from the Pinot segment partition id. + * @param partitionId the segment partition group id on Pinot + * @return + */ + public static int getStreamPartitionIdFromPinotPartitionId(int partitionId) { + return partitionId % PARTITION_PADDING_OFFSET; + } + + /** + * Getting the StreamConfig index of StreamConfigs list from the Pinot segment partition id. + * @param partitionId the segment partition group id on Pinot + * @return + */ + public static int getStreamConfigIndexFromPinotPartitionId(int partitionId) { + return partitionId / PARTITION_PADDING_OFFSET; + } + + /** + * Fetches the streamConfig from the list of streamConfigs according to the partitonGroupId. + * @param tableConfig realtime table config + * @param partitionGroupId partitionGroupId + * @return streamConfig map + */ + public static Map getStreamConfigMapWithPartitionGroupId( + TableConfig tableConfig, int partitionGroupId) { String tableNameWithType = tableConfig.getTableName(); Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); @@ -63,10 +150,13 @@ public static Map getStreamConfigMap(TableConfig tableConfig) { && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { List> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); - Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table"); - streamConfigMap = streamConfigMaps.get(0); + Preconditions.checkState( + streamConfigMaps.size() > partitionGroupId / PARTITION_PADDING_OFFSET, + "Table does not have enough number of stream"); + streamConfigMap = streamConfigMaps.get(partitionGroupId / PARTITION_PADDING_OFFSET); } - if (streamConfigMap == null && tableConfig.getIndexingConfig() != null) { + if (partitionGroupId < PARTITION_PADDING_OFFSET + && streamConfigMap == null && tableConfig.getIndexingConfig() != null) { streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs(); } if (streamConfigMap == null) { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index b2b4c87b29e5..1e9517a33011 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; @@ -44,7 +45,9 @@ public class IngestionConfigUtilsTest { public void testGetStreamConfigMap() { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); + Assert.fail("Should fail for OFFLINE table"); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); Assert.fail("Should fail for OFFLINE table"); } catch (IllegalStateException e) { // expected @@ -58,7 +61,7 @@ public void testGetStreamConfigMap() { IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap))); tableConfig.setIngestionConfig(ingestionConfig); - Map actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 1); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka"); @@ -69,30 +72,30 @@ public void testGetStreamConfigMap() { IndexingConfig indexingConfig = new IndexingConfig(); indexingConfig.setStreamConfigs(deprecatedStreamConfigMap); tableConfig.setIndexingConfig(indexingConfig); - actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 1); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka"); - // fail if multiple found + // Able to get multiple stream configs ingestionConfig.setStreamIngestionConfig( new StreamIngestionConfig(Arrays.asList(streamConfigMap, deprecatedStreamConfigMap))); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); - Assert.fail("Should fail for multiple stream configs"); + List> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig); + Assert.assertEquals(streamConfigs.size(), 2); } catch (IllegalStateException e) { // expected } // get from indexing config tableConfig.setIngestionConfig(null); - actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 2); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "foo"); // fail if found nowhere tableConfig.setIndexingConfig(new IndexingConfig()); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); Assert.fail("Should fail for no stream config found"); } catch (IllegalStateException e) { // expected