Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.1.1 Added source class for GCP BigQuery #56

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
.idea/modules.xml
.idea/vcs.xml
.idea/workspace.xml
**/__pycache__/
71 changes: 39 additions & 32 deletions local-data-platform/local_data_platform/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
from abc import ABC
from enum import Enum
from pathlib import Path
import os
from dataclasses import dataclass, asdict
from .exceptions import TableNotFound, PipelineNotFound, EngineNotFound
from collections import namedtuple

Transaction = namedtuple("Transaction", ["query", "desc"])


class SupportedFormat(Enum):
ICEBERG = 'ICEBERG'
PARQUET = 'PARQUET'
CSV = 'CSV'
ICEBERG = "ICEBERG"
PARQUET = "PARQUET"
CSV = "CSV"
JSON = "JSON"


class SupportedEngine(Enum):
PYARROW = 'PYARROW'
PYSPARK = 'PYSPARK'
DUCKDB = 'DUCKDB'
PYARROW = "PYARROW"
PYSPARK = "PYSPARK"
DUCKDB = "DUCKDB"
BIGQUERY = "BIGQUERY"


class Base(ABC):

def __init__(self, *args, **kwargs): pass
def __init__(self, *args, **kwargs):
pass

def get(self):
pass
Expand All @@ -28,19 +37,19 @@ def put(self):

class Table(Base):

def __init__(
self,
name: str,
path: str = None
):
def __init__(self, name: str, path: Path = os.getcwd()):
self.name = name
self.path = path

def get(self):
raise TableNotFound(f"Table {self.name} of type {self.format} cannot be accessed at {self.path}")
raise TableNotFound(
f"Table {self.name} of type {self.format} cannot be accessed at {self.path}"
)

def put(self):
raise TableNotFound(f"Table {self.name} of type {self.format} cannot be accessed at {self.path}")
raise TableNotFound(
f"Table {self.name} of type {self.format} cannot be accessed at {self.path}"
)


class Flow(Base):
Expand All @@ -49,21 +58,24 @@ class Flow(Base):
target: Table

def extract(self):
raise PipelineNotFound(f"Pipeline {self.name} cannot extract data from {self.source.name}")
raise PipelineNotFound(
f"Pipeline {self.name} cannot extract data from {self.source.name}"
)

def transform(self):
raise PipelineNotFound(f"Pipeline {self.name} cannot transform data from {self.source.name}")
raise PipelineNotFound(
f"Pipeline {self.name} cannot transform data from {self.source.name}"
)

def load(self):
raise PipelineNotFound(f"Pipeline {self.name} cannot load data at {self.target.name}")
raise PipelineNotFound(
f"Pipeline {self.name} cannot load data at {self.target.name}"
)


class Worker(Base):

def __init__(
self,
name: str
):
def __init__(self, name: str):
self.name = name

def get(self):
Expand All @@ -73,14 +85,9 @@ def put(self):
raise EngineNotFound(f"Worker {self.name} is not a supported engine")



@dataclass
class Config(Base):
__slots__ = (
"identifier",
"who",
"metadata"
)
__slots__ = ("identifier", "who", "metadata")
identifier: str
who: str
what: str
Expand All @@ -90,8 +97,8 @@ class Config(Base):
metadata: Flow







@dataclass
class Credentials(Base):
__slots__ = ("path", "project_id")
path: Path
project_id: str
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,16 +1,44 @@
from pyiceberg.catalog.sql import SqlCatalog
from typing import List
from pyiceberg.typedef import Identifier
from sqlite3 import OperationalError
from local_data_platform.logger import log


logger = log()
class LocalIcebergCatalog(SqlCatalog):
"""
LocalIcebergCatalog is a subclass of SqlCatalog that provides methods to interact with a local Iceberg catalog using SQLite.
Attributes:
name (str): The name of the catalog.
uri (str): The URI for the SQLite database.
warehouse (str): The file path for the warehouse.
Methods:
__init__(name: str, path: str, *args, **kwargs):
Initializes the LocalIcebergCatalog with the given name and path.
Args:
name (str): The name of the catalog.
path (str): The file path where the catalog is stored.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Raises:
Exception: If initialization fails.
get_dbs() -> List[Identifier]:
Returns a list of database identifiers in the catalog.
get_tables(namespace: Identifier):
Returns a list of tables in the specified namespace.
"""

def __init__(self, name: str, path: str, *args, **kwargs):
self.name = name
self.uri = f"sqlite:///{path}/pyiceberg_catalog.db" # Ensure .db file extension
self.uri = f"sqlite:///{path}/{name}.db"
self.warehouse = f"file://{path}"
super().__init__(*args, **kwargs, **self.__dict__)

try:
logger.error(f"Initializing LocalIcebergCatalog with {self.uri}")
super().__init__(*args, **kwargs, **self.__dict__)
except Exception as e:
logger.error(f"Failed to initialize LocalIcebergCatalog {e}")
raise Exception(f"Failed to initialize LocalIcebergCatalog {e}")

def get_dbs(self) -> List[Identifier]:
return self.list_namespaces()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@ def put(self, df: Table):
Writing data from PyArrow Table of size {len(df)} records
"""
)
with open(self.path, 'wb') as f:
csv.write_csv(df, f)
if df is not None or len(df) > 0:
with open(self.path, "wb") as f:
csv.write_csv(df, f)
else:
logger.error("No data to write to CSV as the data is empty")
raise ValueError("No data to write")
37 changes: 30 additions & 7 deletions local-data-platform/local_data_platform/format/iceberg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,48 @@


class Iceberg(Format):
"""
Iceberg class for handling data operations with Iceberg tables.

Attributes:
catalog_identifier (str): Identifier for the Iceberg catalog.
catalog (LocalIcebergCatalog): Instance of LocalIcebergCatalog for managing Iceberg tables.
identifier (str): Unique identifier for the Iceberg table.
metadata (dict): Metadata associated with the Iceberg table.

Methods:
__init__(catalog: str, *args, **kwargs):
Initializes the Iceberg instance with the given catalog and metadata.

put(df: Table) -> Table:
Writes the given data frame to the Iceberg table.

get():
Fetches data from the Iceberg table and returns it as an Arrow table.
"""
def __init__(self, catalog: str, *args, **kwargs):
logger.info(f"Iceberg catalog : {catalog}")
self.catalog_identifier = catalog["identifier"]
self.catalog = LocalIcebergCatalog(
catalog['identifier'],
path=catalog['warehouse_path']
self.catalog_identifier, path=catalog["warehouse_path"]
)

self.identifier = f"{catalog['identifier']}.{kwargs['name']}"
self.catalog.create_namespace(self.catalog_identifier)
self.identifier = f"{self.catalog_identifier}.{kwargs['name']}"
self.metadata = kwargs
logger.info(f"Iceberg created with catalog namespace {self.catalog_identifier}")
logger.info(f"Iceberg initialised with identifier {self.identifier}")
super().__init__(*args, **kwargs)

def put(self, df: Table) -> Table:
logger.info(f"self.identifier {self.identifier}")
logger.info(
f"""
Writing {len(df)} to Iceberg Table
Writing {len(df)} to Iceberg Table {self.identifier}
"""
)
table = self.catalog.create_table_if_not_exists(identifier=self.identifier, schema=df.schema)
table = self.catalog.create_table_if_not_exists(
identifier=self.identifier, schema=df.schema
)
table.append(df)
return table

Expand All @@ -40,7 +63,7 @@ def get(self):
data = self.catalog.load_table(self.identifier).scan().to_arrow()
logger.info(
f"""
Returning {len(data)} records from Iceberg Table
Returning {len(data)} records from Iceberg Table {self.identifier}
"""
)
return data
11 changes: 9 additions & 2 deletions local-data-platform/local_data_platform/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@


def log():
basicConfig(level=INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
basicConfig(level=INFO, format=
"""
%(filename)s - %(funcName)s
- %(asctime)s - %(name)s
- %(levelname)s
- message : %(message)s
"""
)

return getLogger('loca_data_platform')
return getLogger('loca_data_platform-demo')
4 changes: 2 additions & 2 deletions local-data-platform/local_data_platform/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Pipeline(Flow):

def __init__(self, config: Config, *args, **kwargs):
self.config = config
self.source = Source(**config.metadata['source'])
self.target = Target(**config.metadata['target'])
# self.source = Source(**config.metadata['source'])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrutunjay-kinagi what went wrong here?

# self.target = Target(**config.metadata['target'])
super().__init__(*args, **kwargs)

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from local_data_platform.pipeline.egression import Egression
from pyarrow import parquet, Table
from local_data_platform.format.csv import CSV
from local_data_platform.format.iceberg import Iceberg
from local_data_platform import Config
from local_data_platform.logger import log


logger = log()


class CSVToIceberg(Egression):
"""
CSVToIceberg is a class that handles the transformation of CSV data to Iceberg format.

Attributes:
source (CSV): The source CSV configuration.
target (Iceberg): The target Iceberg configuration.

Methods:
__init__(config: Config, *args, **kwargs):
Initializes the CSVToIceberg instance with the provided configuration.
Logs the initialization process and sets up the source and target attributes.
"""
def __init__(self, config: Config, *args, **kwargs):
logger.info(
f"""
Initialising CSVToIceberg with config {config}
"""
)
self.source = config.metadata["source"]
self.target = config.metadata["target"]
self.source = CSV(name=self.source["name"], path=self.source["path"])
self.target = Iceberg(name=self.target["name"], catalog=self.target["catalog"])
logger.info(
f"""
CSVToIceberg initialised with
source {self.source}
target {self.target}
"""
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,33 @@


class IcebergToCSV(Egression):
"""
IcebergToCSV is a class that handles the transformation of data from an Iceberg source to a CSV target.

Attributes:
source (Iceberg): The source Iceberg configuration.
target (CSV): The target CSV configuration.

Methods:
__init__(config: Config, *args, **kwargs):
Initializes the IcebergToCSV instance with the provided configuration.
Logs the initialization process and sets up the source and target configurations.

Args:
config (Config): Configuration object containing metadata for source and target.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
"""
def __init__(self, config: Config, *args, **kwargs):
self.source = config.metadata['source']
self.target = config.metadata['target']
self.target = CSV(
name=self.target['name'],
path=self.target['path']
)
self.source = Iceberg(
name=self.source['name'],
catalog=self.source['catalog']
logger.info(
f"""
Initialising IcebergToCSV with config {config}
"""
)
self.source = config.metadata["source"]
self.target = config.metadata["target"]
self.target = CSV(name=self.target["name"], path=self.target["path"])
self.source = Iceberg(name=self.source["name"], catalog=self.source["catalog"])
logger.info(
f"""
IcebergToCSV initialised with
Expand Down
Loading