Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/mongodb) re-order aggregation logic #12428

33 changes: 17 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,26 +218,27 @@
"""

aggregations: List[Dict] = []

# The order of the aggregations impacts execution time. By setting the sample/limit aggregation first,
# the subsequent aggregations process a much smaller dataset, improving performance.
if sample_size:
if use_random_sampling:
aggregations.append({"$sample": {"size": sample_size}})

Check warning on line 226 in metadata-ingestion/src/datahub/ingestion/source/mongodb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mongodb.py#L224-L226

Added lines #L224 - L226 were not covered by tests
else:
aggregations.append({"$limit": sample_size})

Check warning on line 228 in metadata-ingestion/src/datahub/ingestion/source/mongodb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mongodb.py#L228

Added line #L228 was not covered by tests

if should_add_document_size_filter:
doc_size_field = "temporary_doc_size_field"
# create a temporary field to store the size of the document. filter on it and then remove it.
aggregations = [
{"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}},
{"$match": {doc_size_field: {"$lt": max_document_size}}},
{"$project": {doc_size_field: 0}},
]
if use_random_sampling:
# get sample documents in collection
if sample_size:
aggregations.append({"$sample": {"size": sample_size}})
documents = collection.aggregate(
aggregations,
allowDiskUse=True,
aggregations.extend(

Check warning on line 233 in metadata-ingestion/src/datahub/ingestion/source/mongodb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mongodb.py#L233

Added line #L233 was not covered by tests
[
{"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}},
{"$match": {doc_size_field: {"$lt": max_document_size}}},
{"$project": {doc_size_field: 0}},
]
)
else:
if sample_size:
aggregations.append({"$limit": sample_size})
documents = collection.aggregate(aggregations, allowDiskUse=True)

documents = collection.aggregate(aggregations, allowDiskUse=True)

Check warning on line 241 in metadata-ingestion/src/datahub/ingestion/source/mongodb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mongodb.py#L241

Added line #L241 was not covered by tests

return construct_schema(list(documents), delimiter)

Expand Down
Loading
Loading