Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi stream ingestion support #13790

Merged
merged 12 commits into from
Dec 19, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void init(PinotConfiguration pinotConfiguration)
// This executor service is used to do async tasks from multiget util or table rebalancing.
_executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d");
_tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
"tenant-rebalance-thread-%d");
"tenant-rebalance-thread-%d");
_tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService);
}

Expand All @@ -272,7 +272,7 @@ public void init(PinotConfiguration pinotConfiguration)
private ExecutorService createExecutorService(int numThreadPool, String threadNameFormat) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory)
: Executors.newFixedThreadPool(numThreadPool, threadFactory);
: Executors.newFixedThreadPool(numThreadPool, threadFactory);
}

private void inferHostnameIfNeeded(ControllerConf config) {
Expand Down Expand Up @@ -577,10 +577,12 @@ protected void configure() {
_helixResourceManager.getAllRealtimeTables().forEach(rt -> {
TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
if (tableConfig != null) {
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig);
try {
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),
streamConfigMap);
for (Map<String, String> streamConfigMap : streamConfigMaps) {
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),
streamConfigMap);
}
} catch (Exception e) {
existingHlcTables.add(rt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
Expand Down Expand Up @@ -419,10 +420,11 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
numInvalidEndTime);

if (tableType == TableType.REALTIME && tableConfig != null) {
StreamConfig streamConfig =
new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig)
).collect(Collectors.toList());
new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics,
streamConfig).findAndEmitMetrics(idealState);
streamConfigs).findAndEmitMetrics(idealState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n
/**
* Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream,
* with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups.
* In particular, this method can also be used to fetch from multiple stream topics.
*
* Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
*
Expand All @@ -79,23 +80,24 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n
* the collection of shards in partition group 1, should remain unchanged in the response,
* whereas shards 3,4 can be added to new partition groups if needed.
*
* @param streamConfig the streamConfig from the tableConfig
* @param streamConfigs the List of streamConfig from the tableConfig
* @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current
* partition groups.
* The size of this list is equal to the number of partition groups,
* and is created using the latest segment zk metadata.
*/
public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecate the old method or remove it. Please also clean up all usages of the old one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) {
PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList);
new PartitionGroupMetadataFetcher(streamConfigs, partitionGroupConsumptionStatusList);
try {
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
} catch (Exception e) {
Exception fetcherException = partitionGroupMetadataFetcher.getException();
LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(),
streamConfig.getTableNameWithType(), fetcherException);
LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}",
streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
streamConfigs.get(0).getTableNameWithType(), fetcherException);
throw new RuntimeException(fetcherException);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
Expand Down Expand Up @@ -65,25 +67,29 @@ public class MissingConsumingSegmentFinder {
private ControllerMetrics _controllerMetrics;

public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore,
ControllerMetrics controllerMetrics, StreamConfig streamConfig) {
ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
_realtimeTableName = realtimeTableName;
_controllerMetrics = controllerMetrics;
_segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics);
_streamPartitionMsgOffsetFactory =
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();

// create partition group id to largest stream offset map
_partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
streamConfigs.stream().map(streamConfig -> {
streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
return streamConfig;
});
try {
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, Collections.emptyList())
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList())
.forEach(metadata -> {
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
});
} catch (Exception e) {
LOGGER.warn("Problem encountered in fetching stream metadata for topic: {} of table: {}. "
LOGGER.warn("Problem encountered in fetching stream metadata for topics: {} of table: {}. "
+ "Continue finding missing consuming segment only with ideal state information.",
streamConfig.getTopicName(), streamConfig.getTableNameWithType());
streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).collect(Collectors.toList()),
streamConfigs.get(0).getTableNameWithType());
}
}

Expand Down
Loading
Loading