Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi stream ingestion support #13790

Merged
merged 12 commits into from
Dec 19, 2024

Conversation

lnbest0707-uber
Copy link
Contributor

@lnbest0707-uber lnbest0707-uber commented Aug 9, 2024

feature
Reference: #13780 Design Doc

Please refer to design doc for details. TLDR:

  • Add support to ingest from multiple source by a single table
  • Use existing interface (TableConfig) to define multiple streams
  • Separate the partition id definition between Stream and Pinot segment
  • Compatible with existing stream partition auto expansion logics

Feature tested on multiple Kafka topics with different decoder format. Due to resource limitations, not able to test other upstream source e2e.

Some TODOs:

  • Validation and Limitation on multiple stream configs.
  • Standardize the usage of StreamConfig object. e.g. some are only used to get non-topic related static metadata, should use other interface.
  • Adding/removing stream support or sanity check.

From user point of view, the feature does not change any existing interfaces. Users could define the table config in the same way and combine with any other transform functions or instance assignment strategies. A sample table ingestion config would look like:

"ingestionConfig": {
      "streamIngestionConfig": {
        "streamConfigMaps": [
          {
            "realtime.segment.flush.threshold.rows": "0",
            "stream.kafka.decoder.class.name": "xxxxDecoder",
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "realtime.segment.flush.threshold.segment.size": "200MB",
            "stream.kafka.broker.list": "<host>:<port>",
            "realtime.segment.flush.threshold.time": "7200000",
            "stream.kafka.consumer.prop.auto.offset.reset": "largest",
            "stream.kafka.topic.name": "topicName1"
          },
          {
            "realtime.segment.flush.threshold.rows": "0",
            "stream.kafka.decoder.class.name": "xxxxDecoder",
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "realtime.segment.flush.threshold.segment.size": "200MB",
            "stream.kafka.broker.list": "<host>:<port>"",
            "realtime.segment.flush.threshold.time": "7200000",
            "stream.kafka.consumer.prop.auto.offset.reset": "largest",
            "stream.kafka.topic.name": "topicName2"
          }
        ],
        "columnMajorSegmentBuilderEnabled": true
      },
      "transformConfigs": [
        {
          "columnName": "_ingestionEpochMs",
          "transformFunction": "__metadata$recordTimestamp"
        }
      ],
      "schemaConformingTransformerV2Config": {
        "enableIndexableExtras": true,
        "indexableExtrasField": "json_data",
        "enableUnindexableExtras": true,
        "unindexableExtrasField": "json_data_no_idx",
        "unindexableFieldSuffix": "_noindex",
        "fieldPathsToDrop": [],
        "fieldPathsToSkipStorage": [],
        "columnNameToJsonKeyPathMap": {},
        "mergedTextIndexField": "__mergedTextIndex",
        "useAnonymousDotInFieldNames": true,
        "optimizeCaseInsensitiveSearch": false,
        "reverseTextIndexKeyValueOrder": true,
        "mergedTextIndexDocumentMaxLength": 32766,
        "mergedTextIndexBinaryDocumentDetectionMinLength": 512,
        "fieldsToDoubleIngest": [],
        "jsonKeyValueSeparator": "\u001e",
        "mergedTextIndexBeginOfDocAnchor": "\u0002",
        "mergedTextIndexEndOfDocAnchor": "\u0003",
        "fieldPathsToPreserveInput": [],
        "fieldPathsToPreserveInputWithIndex": []
      },
      "continueOnError": false,
      "rowTimeValueCheck": false,
      "segmentTimeValueCheck": true
    }

@codecov-commenter
Copy link

codecov-commenter commented Aug 10, 2024

Codecov Report

Attention: Patch coverage is 63.21839% with 64 lines in your changes missing coverage. Please review.

Project coverage is 64.03%. Comparing base (59551e4) to head (d8f46da).
Report is 1483 commits behind head on master.

Files with missing lines Patch % Lines
...g/apache/pinot/spi/utils/IngestionConfigUtils.java 53.33% 11 Missing and 3 partials ⚠️
...inot/spi/stream/PartitionGroupMetadataFetcher.java 65.71% 11 Missing and 1 partial ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 84.21% 9 Missing ⚠️
...apache/pinot/controller/BaseControllerStarter.java 0.00% 5 Missing ⚠️
...x/core/realtime/MissingConsumingSegmentFinder.java 44.44% 5 Missing ⚠️
...r/validation/RealtimeSegmentValidationManager.java 0.00% 5 Missing ⚠️
...he/pinot/segment/local/utils/TableConfigUtils.java 50.00% 3 Missing and 2 partials ⚠️
...roller/helix/core/PinotTableIdealStateBuilder.java 0.00% 3 Missing ⚠️
.../helix/core/realtime/SegmentCompletionManager.java 0.00% 2 Missing ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 87.50% 1 Missing ⚠️
... and 3 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13790      +/-   ##
============================================
+ Coverage     61.75%   64.03%   +2.27%     
- Complexity      207     1605    +1398     
============================================
  Files          2436     2703     +267     
  Lines        133233   149053   +15820     
  Branches      20636    22849    +2213     
============================================
+ Hits          82274    95439   +13165     
- Misses        44911    46620    +1709     
- Partials       6048     6994     +946     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.95% <63.21%> (+2.24%) ⬆️
java-21 63.92% <63.21%> (+2.30%) ⬆️
skip-bytebuffers-false 63.97% <63.21%> (+2.22%) ⬆️
skip-bytebuffers-true 63.90% <63.21%> (+36.17%) ⬆️
temurin 64.03% <63.21%> (+2.27%) ⬆️
unittests 64.02% <63.21%> (+2.27%) ⬆️
unittests1 56.29% <33.70%> (+9.40%) ⬆️
unittests2 34.47% <52.29%> (+6.74%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes ingestion real-time labels Aug 12, 2024
@@ -686,9 +686,8 @@ public void ingestionStreamConfigsTest() {
// only 1 stream config allowed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, update comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -173,15 +173,18 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N

// Only allow realtime tables with non-null stream.type and LLC consumer.type
if (tableConfig.getTableType() == TableType.REALTIME) {
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a validation to avoid upsert table creating with multiple topics for now. One of the most important reason is that upsert table requires same primary keys to be distributed to the same host. it will be a bit complex to validate whether all source topics are partitioned equally (partition key, partition counts, parttion algorithms).
there are other potential concerns including race condition consumption during reload, rebalance, pause ingestion etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea, updated.

for (int i = 0; i < streamConfigs.size(); i++) {
final int index = i;
try {
partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream().map(
Copy link
Contributor

@deemoliu deemoliu Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please elaborate on why we don't need to maintain orders for partitionIds? we use list for streamConfigs and use unordered set to store partitionids?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is overloading an existing same name function. The other one is also returning a Set<>. Usage of the output is only checking if partitionId exists instead of checking its order.

@lnbest0707-uber lnbest0707-uber force-pushed the upstream-fork/multi_topics branch from 0bfb91f to 1af31b2 Compare October 17, 2024 06:06
@lnbest0707-uber lnbest0707-uber force-pushed the upstream-fork/multi_topics branch from 1af31b2 to 87dacbd Compare November 8, 2024 01:14
@lnbest0707-uber
Copy link
Contributor Author

lnbest0707-uber commented Nov 14, 2024

Add the production running report:
The feature has been running in Uber production environment with PBs of data for months. There are hundreds of Pinot tables created. One table can be created with 20-30+ topics ingested with no issues. The overall ingestion and query performance is also competitive with the common single topic ingestions.

Notes: the feature is running with Kafka's multiple topics ingestion. We do not have resources to run it with other or multiple type of streams.

Comment on lines 97 to 75
_streamPartitionMsgOffsetFactory =
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this breaks when mixing streams that do not use the same offset factory type, e.g. kinesis and kafka. (there's a lot of this specific case for offset factory, won't mark all)

We could UT, or shall we add a TODO for them since we can't easily test e2e internally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a enforcement check when we fetch the streamConfigs to enforce them to be same for now.
In long term, we need to redefine the structure of streamConfig for the usage.

@@ -1294,7 +1315,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st
selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset,
partitionIdToSmallestOffset, tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getEndOffset());
createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the partitionId to choose the correct streamConfig?

Or we'd need to document that segment flush settings are only used from the first streamConfig in the table config's list (though I feel different flush settings per stream will eventually be a future requirement)

@@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt
}
}

public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old constructor is no longer used, can we remove it and update the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

_streamConfigs = streamConfigs;
_partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
_newPartitionGroupMetadataList = new ArrayList<>();
}

public PartitionGroupMetadataFetcher(StreamConfig streamConfig,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here, let's remove the unused constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

* @param tableConfig realtime table config
* @return streamConfigs List of maps
*/
public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the old method if it is no longer used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) {
int partitionGroupId = entry.getKey();
LLCSegmentName llcSegmentName = entry.getValue();
SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(streamConfig.getTableNameWithType(), llcSegmentName.getSegmentName());
getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), llcSegmentName.getSegmentName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: idealState.getId() instead of .get(0)?

: getPartitionGroupConsumptionStatusList(idealState, streamConfig);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
: getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
// FIXME: Right now, we assume topics are sharing same offset criteria
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to add a precondition to check this?

* @param tableConfig realtime table config
* @return streamConfigs List of maps
*/
public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add deprecated annotation to the old getStreamConfigMap() if we are not removing it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the old one.

&& tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
List<Map<String, String>> streamConfigMaps =
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit)

Suggested change
Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream");
Preconditions.checkState(!streamConfigMaps.isEmpty(), "Table must have at least 1 stream");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

List<Map<String, String>> streamConfigMaps =
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream");
// For now, with multiple topics, we only support same type of stream (e.g. Kafka)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this limitation? Some comments explaining this would be good.
Only apply this check when there are multiple streams to match the current behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added detailed explanations. Basically it is due to our resources not able to cover the testing of other stream types.

streamConfigMap);
for (Map<String, String> streamConfigMap : streamConfigMaps) {
StreamConfig.validateConsumerType(
streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) Not comply to Pinot Style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how this works, but the mvn check style could pass.

* @param partitionGroupConsumptionStatusList
* @return
*/
public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecate the old method or remove it. Please also clean up all usages of the old one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@lnbest0707-uber lnbest0707-uber force-pushed the upstream-fork/multi_topics branch from 87dacbd to cae4dc5 Compare November 22, 2024 00:35
@lnbest0707-uber
Copy link
Contributor Author

Thanks @itschrispeck for identifying a edge case issue and proposing the fix in the commit 6bd1307 to address the missing segments issue if cannot fetch partition metadata from the stream.

Copy link
Collaborator

@itschrispeck itschrispeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, suggest we thoroughly document the current limitations (e.g. no mixed stream types)

lnbest0707-uber and others added 9 commits December 17, 2024 14:02
…ments

Summary:
Ensure transient exceptions do not prevent creating new consuming segments. If some exception is hit, attempt to reconcile any successful fetches with partition group metadata.

This ensures consuming partitions are not dropped, and attempts to add and new partitions discovered successfully.

Test Plan:
After deployment, despite still some `TransientConsumerException`, no new missing consuming segments appear
{F1002071843}

{F1002071523}

Reviewers: gaoxin, tingchen

Reviewed By: gaoxin

JIRA Issues: EVA-8951

Differential Revision: https://code.uberinternal.com/D15748639
@lnbest0707-uber lnbest0707-uber force-pushed the upstream-fork/multi_topics branch from ba53fa3 to ad34f17 Compare December 17, 2024 22:11
@lnbest0707-uber
Copy link
Contributor Author

@Jackie-Jiang could you pls review again and see if we still have blockers before merging? Thanks

@kishoreg
Copy link
Member

Thanks again for contributing this feature. Is there a user doc associated with this feature

@lnbest0707-uber
Copy link
Contributor Author

Thanks again for contributing this feature. Is there a user doc associated with this feature

@kishoreg thanks for bringing this up. After the PR merged, I would update the pinot-doc with the feature and its usage. In general, users could use the exact same way to define the table config with existing interfaces. I would provide an example in PR description.

@chenboat chenboat merged commit 73abb21 into apache:master Dec 19, 2024
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature ingestion real-time release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants