Skip to content

Commit

Permalink
more lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal committed Sep 16, 2024
1 parent 159bc3e commit 0d46f7f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
from urllib.parse import quote

import requests
from dremio_connector.dremio_sql_queries import DremioSQLQueries
from sqlglot import parse_one

from datahub.ingestion.source.dremio.dremio_sql_queries import DremioSQLQueries

logger = logging.getLogger(__name__)

_dml_queries = [
Expand Down Expand Up @@ -94,7 +95,7 @@ def __init__(
affected_datasets: Optional[str] = None,
):
self.job_id = job_id
self.username = (username,)
self.username = username
self.submitted_ts = self._get_submitted_ts(submitted_ts)
self.query = self._get_query(query)
self.query_without_comments = self.get_raw_query(query)
Expand Down Expand Up @@ -215,14 +216,14 @@ def set_credentials(self) -> None:
"Credentials cannot be refreshed. Please check your username and password"
)

def execute_get_request(self, url: str) -> json:
def execute_get_request(self, url: str) -> Dict:
"""execute a get request on dremio"""
response = requests.get(
url=(self.base_url + url), headers=self.headers, verify=self._verify
)
return response.json()

def execute_post_request(self, url: str, data: str) -> json:
def execute_post_request(self, url: str, data: str) -> Dict:
"""execute a get request on dremio"""
response = requests.post(
url=(self.base_url + url),
Expand All @@ -232,7 +233,7 @@ def execute_post_request(self, url: str, data: str) -> json:
)
return response.json()

def _execute_post_request_to_get_headers(self, headers: Dict, data: str) -> json:
def _execute_post_request_to_get_headers(self, headers: Dict, data: str) -> Dict:
"""execute a get request on dremio"""
response = requests.post(
url=f"{self.dremio_url}/apiv2/login",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DremioSourceConfig(ConfigModel):
)

password: Optional[str] = Field(
default=9047,
default="9047",
description="Dremio REST API port",
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
""""This module contains datahub workunits and metadata change events"""

import logging
from typing import Dict, Iterable, Optional, Union

from dremio_connector.dremio_config import (
DremioFolderKey,
DremioSourceConfig,
DremioSpaceKey,
)
from dremio_connector.dremio_source_controller import DremioController
from typing import Dict, Iterable, List, Optional, Union

from datahub.emitter import mcp_builder
from datahub.emitter.mce_builder import (
Expand All @@ -28,6 +21,12 @@
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.dremio.dremio_config import (
DremioFolderKey,
DremioSourceConfig,
DremioSpaceKey,
)
from datahub.ingestion.source.dremio.dremio_source_controller import DremioController
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.metadata._urns.urn_defs import CorpUserUrn
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
Expand Down Expand Up @@ -97,14 +96,18 @@ class DremioSource(Source):
def __init__(self, config: DremioSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.ctx = ctx
assert self.ctx.graph is not None
self.graph: DataHubGraph = self.ctx.graph
self.config = config
assert self.config.env is not None
self.env = self.config.env

self.dremio_source: DremioController = self.create_controller()
self.platform_instance = config.platform_instance
self.aggregator = SqlParsingAggregator(
platform=make_data_platform_urn(self.get_platform()),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
graph=self.ctx.graph,
generate_usage_statistics=True,
generate_operations=True,
Expand All @@ -118,7 +121,7 @@ def __init__(self, config: DremioSourceConfig, ctx: PipelineContext):

self.schema_resolver = self.graph.initialize_schema_resolver_from_datahub(
platform=self.get_platform(),
env=self.config.env,
env=self.env,
platform_instance=config.platform_instance,
)

Expand Down Expand Up @@ -156,14 +159,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
parent_folder=schema[-2].lower(),
platform=make_data_platform_urn("dremio"),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
)
else:
lst_con_key = DremioSpaceKey(
space_name=space_name.lower(),
platform=make_data_platform_urn("dremio"),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
)

yield from self.generate_containers(space_name, sub_type, schema[1:])
Expand Down Expand Up @@ -194,7 +197,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
platform=make_data_platform_urn("dremio"),
name=f"{'.'.join(schema)}.{table}".lower(),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
),
view_definition=definition,
default_db=schema[0].lower() if len(schema) > 0 else "",
Expand All @@ -210,7 +213,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
platform=make_data_platform_urn("dremio"),
name=ds.lower(),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
)
)

Expand All @@ -221,7 +224,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
platform=make_data_platform_urn("dremio"),
name=query.get("affected_datasets").lower(),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
)

self.aggregator.add_known_query_lineage(
Expand Down Expand Up @@ -255,7 +258,7 @@ def generate_containers(self, space_name, sub_type, rest_of_schema_items: list):
space_name=space_name.lower(),
platform=make_data_platform_urn("dremio"),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
)
con_wu_s = mcp_builder.gen_containers(
container_key=db_key,
Expand All @@ -276,15 +279,15 @@ def generate_containers(self, space_name, sub_type, rest_of_schema_items: list):
parent_folder=rest_of_schema_items[i - 1].lower(),
platform=make_data_platform_urn("dremio"),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
)
else:
schema_key = DremioFolderKey(
folder_name=rest_of_schema_items[i].lower(),
parent_folder=space_name.lower(),
platform=make_data_platform_urn("dremio"),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
)
con_wu_s = mcp_builder.gen_containers(
container_key=schema_key,
Expand All @@ -308,7 +311,7 @@ def generate_view_table_lineage(self, table, schema, definition) -> None:
platform=make_data_platform_urn("dremio"),
name=upstream_dataset.lower(),
platform_instance=self.platform_instance,
env=self.config.env,
env=self.env,
)
)

Expand All @@ -320,7 +323,7 @@ def generate_view_table_lineage(self, table, schema, definition) -> None:
downstream=make_dataset_urn_with_platform_instance(
platform=make_data_platform_urn("dremio"),
name=f"{'.'.join(schema)}.{table}".lower(),
env=self.config.env,
env=self.env,
platform_instance=self.platform_instance,
),
),
Expand All @@ -339,7 +342,7 @@ def construct_metadata_changes(
else:
dataset = table

env = self.config.env
env = self.env

mce.proposedSnapshot = DatasetSnapshotClass._construct_with_defaults()
mce.proposedSnapshot.urn = make_dataset_urn_with_platform_instance(
Expand Down Expand Up @@ -374,12 +377,16 @@ def construct_metadata_changes(
spec = specs.get(dataset)
sp_tags = self.val_and_get_param(spec, "sp_tags", list)
sp_paths = self.val_and_get_param(spec, "sp_browse_paths", list)
self.dremio_source.populate_tag_aspects(mce, sp_tags)
self.dremio_source.populate_path_aspects(mce, sp_paths)
if sp_tags is not None:
self.dremio_source.populate_tag_aspects(mce, sp_tags)
if sp_paths is not None:
self.dremio_source.populate_path_aspects(mce, sp_paths)

return True

def val_and_get_param(self, spec_map: dict, param_name: str, param_type: type):
def val_and_get_param(
self, spec_map: dict, param_name: str, param_type: type
) -> Optional[List]:
if param_name in spec_map and isinstance(param_type, spec_map[param_name]):
return spec_map[param_name]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
import time
from typing import Dict, List, Optional

from dremio_connector.dremio_api import DremioAPIOperations
from dremio_connector.dremio_sql_queries import DremioSQLQueries

from datahub.configuration.common import AllowDenyPattern
from datahub.emitter import mce_builder
from datahub.ingestion.source.dremio.dremio_api import DremioAPIOperations
from datahub.ingestion.source.dremio.dremio_sql_queries import DremioSQLQueries
from datahub.metadata.schema_classes import (
ArrayTypeClass,
AuditStampClass,
Expand Down

0 comments on commit 0d46f7f

Please sign in to comment.