Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/mongodb) re-order aggregation logic #12428

31 changes: 15 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,26 +218,25 @@
"""

aggregations: List[Dict] = []

if sample_size:
if use_random_sampling:
aggregations.append({"$sample": {"size": sample_size}})

Check warning on line 224 in metadata-ingestion/src/datahub/ingestion/source/mongodb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mongodb.py#L222-L224

Added lines #L222 - L224 were not covered by tests
else:
aggregations.append({"$limit": sample_size})

Check warning on line 226 in metadata-ingestion/src/datahub/ingestion/source/mongodb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mongodb.py#L226

Added line #L226 was not covered by tests

if should_add_document_size_filter:
doc_size_field = "temporary_doc_size_field"
# create a temporary field to store the size of the document. filter on it and then remove it.
aggregations = [
{"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}},
{"$match": {doc_size_field: {"$lt": max_document_size}}},
{"$project": {doc_size_field: 0}},
]
if use_random_sampling:
# get sample documents in collection
if sample_size:
aggregations.append({"$sample": {"size": sample_size}})
documents = collection.aggregate(
aggregations,
allowDiskUse=True,
aggregations.extend(

Check warning on line 231 in metadata-ingestion/src/datahub/ingestion/source/mongodb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mongodb.py#L231

Added line #L231 was not covered by tests
[
{"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}},
{"$match": {doc_size_field: {"$lt": max_document_size}}},
{"$project": {doc_size_field: 0}},
]
)
else:
if sample_size:
aggregations.append({"$limit": sample_size})
documents = collection.aggregate(aggregations, allowDiskUse=True)

documents = collection.aggregate(aggregations, allowDiskUse=True)

Check warning on line 239 in metadata-ingestion/src/datahub/ingestion/source/mongodb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mongodb.py#L239

Added line #L239 was not covered by tests

return construct_schema(list(documents), delimiter)

Expand Down
Loading
Loading