Skip to content

Commit

Permalink
implement multi threading
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 committed Oct 28, 2024
1 parent a8ee49a commit 54ce46a
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 60 deletions.
142 changes: 91 additions & 51 deletions ingestion/src/metadata/ingestion/source/database/lineage_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
import os
import traceback
from abc import ABC
from typing import Iterable, Iterator, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from typing import Callable, Iterable, Iterator, Union

from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.logger import ingestion_logger
Expand All @@ -45,6 +48,8 @@ class LineageSource(QueryParserSource, ABC):
- schema
"""

dialect: Dialect

def yield_table_queries_from_logs(self) -> Iterator[TableQuery]:
"""
Method to handle the usage from query logs
Expand Down Expand Up @@ -91,6 +96,28 @@ def get_table_query(self) -> Iterator[TableQuery]:
)
yield from self.yield_table_query()

def generate_lineage_in_thread(self, producer_fn: Callable, processor_fn: Callable):
with ThreadPoolExecutor(max_workers=self.source_config.threads) as executor:
futures = []

for produced_input in producer_fn():
futures.append(executor.submit(processor_fn, produced_input))

# Handle remaining futures after the loop
for future in as_completed(
futures, timeout=self.source_config.parsingTimeoutLimit
):
try:
results = future.result(
timeout=self.source_config.parsingTimeoutLimit
)
yield from results
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error processing result for {produced_input}: {exc}"
)

def yield_table_query(self) -> Iterator[TableQuery]:
"""
Given an engine, iterate over the query results to
Expand Down Expand Up @@ -129,64 +156,77 @@ def _query_already_processed(self, table_query: TableQuery) -> bool:
)
return fqn.get_query_checksum(table_query.query) in checksums or {}

def query_lineage_generator(
self, table_query: TableQuery
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
if not self._query_already_processed(table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=self.dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
)

for lineage_request in lineages or []:
yield lineage_request

# If we identified lineage properly, ingest the original query
if lineage_request.right:
yield Either(
right=CreateQueryRequest(
query=SqlQuery(table_query.query),
query_type=table_query.query_type,
duration=table_query.duration,
processedLineage=True,
service=FullyQualifiedEntityName(self.config.serviceName),
)
)

def yield_query_lineage(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
logger.info("Processing Query Lineage")
"""
Based on the query logs, prepare the lineage
and send it to the sink
"""
connection_type = str(self.service_connection.type.value)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
for table_query in self.get_table_query():
if not self._query_already_processed(table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
)
self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
producer_fn = self.get_table_query
processor_fn = self.query_lineage_generator
yield from self.generate_lineage_in_thread(producer_fn, processor_fn)

for lineage_request in lineages or []:
yield lineage_request

# If we identified lineage properly, ingest the original query
if lineage_request.right:
yield Either(
right=CreateQueryRequest(
query=SqlQuery(table_query.query),
query_type=table_query.query_type,
duration=table_query.duration,
processedLineage=True,
service=FullyQualifiedEntityName(
self.config.serviceName
),
)
def view_lineage_generator(
self, view: TableView
) -> Iterable[Either[AddLineageRequest]]:
try:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.config.serviceName,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.parsingTimeoutLimit,
):
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
else:
yield lineage
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing view {view}: {exc}")

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
logger.info("Processing View Lineage")
for view in self.metadata.yield_es_view_def(self.config.serviceName):
try:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.config.serviceName,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.parsingTimeoutLimit,
):
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
else:
yield lineage
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing view {view}: {exc}")
producer_fn = partial(self.metadata.yield_es_view_def, self.config.serviceName)
processor_fn = self.view_lineage_generator
yield from self.generate_lineage_in_thread(producer_fn, processor_fn)

def yield_procedure_lineage(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.type.tableQuery import QueryByProcedure
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.mssql.constants import (
DEFAULT_DATETIME_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ class QueryByProcedure(BaseModel):
model_config = ConfigDict(populate_by_name=True)


class ProcedureAndQuery(BaseModel):
"""
Model to hold the procedure and its queries
"""

procedure: StoredProcedure
query_by_procedure: QueryByProcedure

model_config = ConfigDict(populate_by_name=True)


class StoredProcedureLineageMixin(ABC):
"""
The full flow is:
Expand Down Expand Up @@ -204,10 +215,20 @@ def yield_procedure_query(
)
)

def yield_procedure_lineage(
self,
def procedure_lineage_processor(
self, procedure_and_query: ProcedureAndQuery
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""Get all the queries and procedures list and yield them"""

yield from self._yield_procedure_lineage(
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
)
yield from self.yield_procedure_query(
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
)

def procedure_lineage_generator(self) -> Iterable[ProcedureAndQuery]:
query = {
"query": {
"bool": {
Expand All @@ -233,9 +254,15 @@ def yield_procedure_lineage(
for query_by_procedure in (
queries_dict.get(procedure.name.root.lower()) or []
):
yield from self._yield_procedure_lineage(
query_by_procedure=query_by_procedure, procedure=procedure
)
yield from self.yield_procedure_query(
query_by_procedure=query_by_procedure, procedure=procedure
yield ProcedureAndQuery(
procedure=procedure, query_by_procedure=query_by_procedure
)

def yield_procedure_lineage(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""Get all the queries and procedures list and yield them"""
logger.info("Processing Lineage for Stored Procedures")
producer_fn = self.procedure_lineage_generator
processor_fn = self.procedure_lineage_processor
yield from self.generate_lineage_in_thread(producer_fn, processor_fn)
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@
"description": "Set the 'Process Stored ProcedureLog Lineage' toggle to control whether to process stored procedure lineage.",
"type": "boolean",
"default": true
},
"threads": {
"description": "Number of Threads to use in order to parallelize lineage ingestion.",
"type": "integer",
"default": 1,
"title": "Number of Threads",
"minimum": 1
}
},
"additionalProperties": false
Expand Down

0 comments on commit 54ce46a

Please sign in to comment.