diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index f7d36165c9fde..3ffcb225db1c2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -893,10 +893,10 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]: if ( re.sub( r"\[.*?\]\.", - "", - field.fieldPath.lower(), - 0, - re.MULTILINE, + repl="", + string=field.fieldPath.lower(), + count=0, + flags=re.MULTILINE, ) == col.field_path.lower() ): @@ -990,7 +990,7 @@ def get_tables_for_dataset( ) items_to_get: Dict[str, TableListItem] = {} - for table_item in table_items.keys(): + for table_item in table_items: items_to_get[table_item] = table_items[table_item] if len(items_to_get) % max_batch_size == 0: yield from self.bigquery_data_dictionary.get_tables_for_dataset( diff --git a/metadata-ingestion/src/datahub/utilities/threaded_iterator_executor.py b/metadata-ingestion/src/datahub/utilities/threaded_iterator_executor.py index 55021e6c4715b..216fa155035d3 100644 --- a/metadata-ingestion/src/datahub/utilities/threaded_iterator_executor.py +++ b/metadata-ingestion/src/datahub/utilities/threaded_iterator_executor.py @@ -1,4 +1,5 @@ import concurrent.futures +import contextlib import queue from typing import Any, Callable, Generator, Iterable, Tuple, TypeVar @@ -38,10 +39,8 @@ def _worker_wrapper( while not out_q.empty(): yield out_q.get_nowait() else: - try: + with contextlib.suppress(queue.Empty): yield out_q.get(timeout=0.2) - except queue.Empty: - pass # Filter out the done futures. futures = [f for f in futures if not f.done()]