From ad3b7b499c73683b5fd117a167ce677bd1f5bad6 Mon Sep 17 00:00:00 2001 From: Shirshanka Das <shirshanka@apache.org> Date: Mon, 5 Sep 2022 23:37:19 -0700 Subject: [PATCH] fix(ingest): profiling - memory usage reduction (#5830) --- .../src/datahub/ingestion/source/ge_data_profiler.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index c415e4fc42e860..f45517b1f00572 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1,3 +1,4 @@ +import collections import concurrent.futures import contextlib import dataclasses @@ -747,7 +748,7 @@ def generate_profiles( "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery", _get_column_quantiles_bigquery_patch, ): - async_profiles = [ + async_profiles = collections.deque( async_executor.submit( self._generate_profile_from_request, query_combiner, @@ -756,12 +757,13 @@ def generate_profiles( profiler_args=profiler_args, ) for request in requests - ] + ) # Avoid using as_completed so that the results are yielded in the # same order as the requests. # for async_profile in concurrent.futures.as_completed(async_profiles): - for async_profile in async_profiles: + while len(async_profiles) > 0: + async_profile = async_profiles.popleft() yield async_profile.result() total_time_taken = timer.elapsed_seconds()