diff --git a/.gitignore b/.gitignore index e735351..eee2dcb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .idea/modules.xml .idea/vcs.xml .idea/workspace.xml +**/__pycache__/ \ No newline at end of file diff --git a/local-data-platform/local_data_platform/__init__.py b/local-data-platform/local_data_platform/__init__.py index 003b51c..a2e81dd 100644 --- a/local-data-platform/local_data_platform/__init__.py +++ b/local-data-platform/local_data_platform/__init__.py @@ -1,23 +1,32 @@ from abc import ABC from enum import Enum +from pathlib import Path +import os from dataclasses import dataclass, asdict from .exceptions import TableNotFound, PipelineNotFound, EngineNotFound +from collections import namedtuple + +Transaction = namedtuple("Transaction", ["query", "desc"]) + class SupportedFormat(Enum): - ICEBERG = 'ICEBERG' - PARQUET = 'PARQUET' - CSV = 'CSV' + ICEBERG = "ICEBERG" + PARQUET = "PARQUET" + CSV = "CSV" + JSON = "JSON" class SupportedEngine(Enum): - PYARROW = 'PYARROW' - PYSPARK = 'PYSPARK' - DUCKDB = 'DUCKDB' + PYARROW = "PYARROW" + PYSPARK = "PYSPARK" + DUCKDB = "DUCKDB" + BIGQUERY = "BIGQUERY" class Base(ABC): - def __init__(self, *args, **kwargs): pass + def __init__(self, *args, **kwargs): + pass def get(self): pass @@ -28,19 +37,19 @@ def put(self): class Table(Base): - def __init__( - self, - name: str, - path: str = None - ): + def __init__(self, name: str, path: Path = os.getcwd()): self.name = name self.path = path def get(self): - raise TableNotFound(f"Table {self.name} of type {self.format} cannot be accessed at {self.path}") + raise TableNotFound( + f"Table {self.name} of type {self.format} cannot be accessed at {self.path}" + ) def put(self): - raise TableNotFound(f"Table {self.name} of type {self.format} cannot be accessed at {self.path}") + raise TableNotFound( + f"Table {self.name} of type {self.format} cannot be accessed at {self.path}" + ) class Flow(Base): @@ -49,21 +58,24 @@ class Flow(Base): target: Table def extract(self): - raise PipelineNotFound(f"Pipeline {self.name} cannot extract data from {self.source.name}") + raise PipelineNotFound( + f"Pipeline {self.name} cannot extract data from {self.source.name}" + ) def transform(self): - raise PipelineNotFound(f"Pipeline {self.name} cannot transform data from {self.source.name}") + raise PipelineNotFound( + f"Pipeline {self.name} cannot transform data from {self.source.name}" + ) def load(self): - raise PipelineNotFound(f"Pipeline {self.name} cannot load data at {self.target.name}") + raise PipelineNotFound( + f"Pipeline {self.name} cannot load data at {self.target.name}" + ) class Worker(Base): - def __init__( - self, - name: str - ): + def __init__(self, name: str): self.name = name def get(self): @@ -73,14 +85,9 @@ def put(self): raise EngineNotFound(f"Worker {self.name} is not a supported engine") - @dataclass class Config(Base): - __slots__ = ( - "identifier", - "who", - "metadata" - ) + __slots__ = ("identifier", "who", "metadata") identifier: str who: str what: str @@ -90,8 +97,8 @@ class Config(Base): metadata: Flow - - - - - +@dataclass +class Credentials(Base): + __slots__ = ("path", "project_id") + path: Path + project_id: str diff --git a/local-data-platform/local_data_platform/__pycache__/__init__.cpython-312.pyc b/local-data-platform/local_data_platform/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index c3ca9a8..0000000 Binary files a/local-data-platform/local_data_platform/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/local-data-platform/local_data_platform/__pycache__/hello_world.cpython-312.pyc b/local-data-platform/local_data_platform/__pycache__/hello_world.cpython-312.pyc deleted file mode 100644 index ccc0589..0000000 Binary files a/local-data-platform/local_data_platform/__pycache__/hello_world.cpython-312.pyc and /dev/null differ diff --git a/local-data-platform/local_data_platform/catalog/local/iceberg/__init__.py b/local-data-platform/local_data_platform/catalog/local/iceberg/__init__.py index 274d112..2bdd147 100644 --- a/local-data-platform/local_data_platform/catalog/local/iceberg/__init__.py +++ b/local-data-platform/local_data_platform/catalog/local/iceberg/__init__.py @@ -1,16 +1,44 @@ from pyiceberg.catalog.sql import SqlCatalog from typing import List from pyiceberg.typedef import Identifier +from sqlite3 import OperationalError +from local_data_platform.logger import log - +logger = log() class LocalIcebergCatalog(SqlCatalog): + """ + LocalIcebergCatalog is a subclass of SqlCatalog that provides methods to interact with a local Iceberg catalog using SQLite. + Attributes: + name (str): The name of the catalog. + uri (str): The URI for the SQLite database. + warehouse (str): The file path for the warehouse. + Methods: + __init__(name: str, path: str, *args, **kwargs): + Initializes the LocalIcebergCatalog with the given name and path. + Args: + name (str): The name of the catalog. + path (str): The file path where the catalog is stored. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + Raises: + Exception: If initialization fails. + get_dbs() -> List[Identifier]: + Returns a list of database identifiers in the catalog. + get_tables(namespace: Identifier): + Returns a list of tables in the specified namespace. + """ def __init__(self, name: str, path: str, *args, **kwargs): self.name = name - self.uri = f"sqlite:///{path}/pyiceberg_catalog.db" # Ensure .db file extension + self.uri = f"sqlite:///{path}/{name}.db" self.warehouse = f"file://{path}" - super().__init__(*args, **kwargs, **self.__dict__) - + try: + logger.error(f"Initializing LocalIcebergCatalog with {self.uri}") + super().__init__(*args, **kwargs, **self.__dict__) + except Exception as e: + logger.error(f"Failed to initialize LocalIcebergCatalog {e}") + raise Exception(f"Failed to initialize LocalIcebergCatalog {e}") + def get_dbs(self) -> List[Identifier]: return self.list_namespaces() diff --git a/local-data-platform/local_data_platform/format/csv/__init__.py b/local-data-platform/local_data_platform/format/csv/__init__.py index 020eed8..51ad4d0 100644 --- a/local-data-platform/local_data_platform/format/csv/__init__.py +++ b/local-data-platform/local_data_platform/format/csv/__init__.py @@ -41,5 +41,9 @@ def put(self, df: Table): Writing data from PyArrow Table of size {len(df)} records """ ) - with open(self.path, 'wb') as f: - csv.write_csv(df, f) + if df is not None or len(df) > 0: + with open(self.path, "wb") as f: + csv.write_csv(df, f) + else: + logger.error("No data to write to CSV as the data is empty") + raise ValueError("No data to write") diff --git a/local-data-platform/local_data_platform/format/iceberg/__init__.py b/local-data-platform/local_data_platform/format/iceberg/__init__.py index b53b6e6..5b81d08 100644 --- a/local-data-platform/local_data_platform/format/iceberg/__init__.py +++ b/local-data-platform/local_data_platform/format/iceberg/__init__.py @@ -9,25 +9,48 @@ class Iceberg(Format): + """ + Iceberg class for handling data operations with Iceberg tables. + Attributes: + catalog_identifier (str): Identifier for the Iceberg catalog. + catalog (LocalIcebergCatalog): Instance of LocalIcebergCatalog for managing Iceberg tables. + identifier (str): Unique identifier for the Iceberg table. + metadata (dict): Metadata associated with the Iceberg table. + + Methods: + __init__(catalog: str, *args, **kwargs): + Initializes the Iceberg instance with the given catalog and metadata. + + put(df: Table) -> Table: + Writes the given data frame to the Iceberg table. + + get(): + Fetches data from the Iceberg table and returns it as an Arrow table. + """ def __init__(self, catalog: str, *args, **kwargs): + logger.info(f"Iceberg catalog : {catalog}") + self.catalog_identifier = catalog["identifier"] self.catalog = LocalIcebergCatalog( - catalog['identifier'], - path=catalog['warehouse_path'] + self.catalog_identifier, path=catalog["warehouse_path"] ) - - self.identifier = f"{catalog['identifier']}.{kwargs['name']}" + self.catalog.create_namespace(self.catalog_identifier) + self.identifier = f"{self.catalog_identifier}.{kwargs['name']}" self.metadata = kwargs + logger.info(f"Iceberg created with catalog namespace {self.catalog_identifier}") + logger.info(f"Iceberg initialised with identifier {self.identifier}") super().__init__(*args, **kwargs) def put(self, df: Table) -> Table: logger.info(f"self.identifier {self.identifier}") logger.info( f""" - Writing {len(df)} to Iceberg Table + Writing {len(df)} to Iceberg Table {self.identifier} """ ) - table = self.catalog.create_table_if_not_exists(identifier=self.identifier, schema=df.schema) + table = self.catalog.create_table_if_not_exists( + identifier=self.identifier, schema=df.schema + ) table.append(df) return table @@ -40,7 +63,7 @@ def get(self): data = self.catalog.load_table(self.identifier).scan().to_arrow() logger.info( f""" - Returning {len(data)} records from Iceberg Table + Returning {len(data)} records from Iceberg Table {self.identifier} """ ) return data diff --git a/local-data-platform/local_data_platform/logger.py b/local-data-platform/local_data_platform/logger.py index 1860020..5a00313 100644 --- a/local-data-platform/local_data_platform/logger.py +++ b/local-data-platform/local_data_platform/logger.py @@ -6,6 +6,13 @@ def log(): - basicConfig(level=INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + basicConfig(level=INFO, format= + """ + %(filename)s - %(funcName)s + - %(asctime)s - %(name)s + - %(levelname)s + - message : %(message)s + """ + ) - return getLogger('loca_data_platform') + return getLogger('loca_data_platform-demo') diff --git a/local-data-platform/local_data_platform/pipeline/__init__.py b/local-data-platform/local_data_platform/pipeline/__init__.py index 5f61d6c..e231632 100644 --- a/local-data-platform/local_data_platform/pipeline/__init__.py +++ b/local-data-platform/local_data_platform/pipeline/__init__.py @@ -11,7 +11,7 @@ class Pipeline(Flow): def __init__(self, config: Config, *args, **kwargs): self.config = config - self.source = Source(**config.metadata['source']) - self.target = Target(**config.metadata['target']) + # self.source = Source(**config.metadata['source']) + # self.target = Target(**config.metadata['target']) super().__init__(*args, **kwargs) diff --git a/local-data-platform/local_data_platform/pipeline/egression/csv_to_iceberg/__init__.py b/local-data-platform/local_data_platform/pipeline/egression/csv_to_iceberg/__init__.py new file mode 100644 index 0000000..43fcd3c --- /dev/null +++ b/local-data-platform/local_data_platform/pipeline/egression/csv_to_iceberg/__init__.py @@ -0,0 +1,41 @@ +from local_data_platform.pipeline.egression import Egression +from pyarrow import parquet, Table +from local_data_platform.format.csv import CSV +from local_data_platform.format.iceberg import Iceberg +from local_data_platform import Config +from local_data_platform.logger import log + + +logger = log() + + +class CSVToIceberg(Egression): + """ + CSVToIceberg is a class that handles the transformation of CSV data to Iceberg format. + + Attributes: + source (CSV): The source CSV configuration. + target (Iceberg): The target Iceberg configuration. + + Methods: + __init__(config: Config, *args, **kwargs): + Initializes the CSVToIceberg instance with the provided configuration. + Logs the initialization process and sets up the source and target attributes. + """ + def __init__(self, config: Config, *args, **kwargs): + logger.info( + f""" + Initialising CSVToIceberg with config {config} + """ + ) + self.source = config.metadata["source"] + self.target = config.metadata["target"] + self.source = CSV(name=self.source["name"], path=self.source["path"]) + self.target = Iceberg(name=self.target["name"], catalog=self.target["catalog"]) + logger.info( + f""" + CSVToIceberg initialised with + source {self.source} + target {self.target} + """ + ) diff --git a/local-data-platform/local_data_platform/pipeline/egression/iceberg_to_csv/__init__.py b/local-data-platform/local_data_platform/pipeline/egression/iceberg_to_csv/__init__.py index ee4b6c0..39d7b2c 100644 --- a/local-data-platform/local_data_platform/pipeline/egression/iceberg_to_csv/__init__.py +++ b/local-data-platform/local_data_platform/pipeline/egression/iceberg_to_csv/__init__.py @@ -10,18 +10,33 @@ class IcebergToCSV(Egression): + """ + IcebergToCSV is a class that handles the transformation of data from an Iceberg source to a CSV target. + Attributes: + source (Iceberg): The source Iceberg configuration. + target (CSV): The target CSV configuration. + + Methods: + __init__(config: Config, *args, **kwargs): + Initializes the IcebergToCSV instance with the provided configuration. + Logs the initialization process and sets up the source and target configurations. + + Args: + config (Config): Configuration object containing metadata for source and target. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + """ def __init__(self, config: Config, *args, **kwargs): - self.source = config.metadata['source'] - self.target = config.metadata['target'] - self.target = CSV( - name=self.target['name'], - path=self.target['path'] - ) - self.source = Iceberg( - name=self.source['name'], - catalog=self.source['catalog'] + logger.info( + f""" + Initialising IcebergToCSV with config {config} + """ ) + self.source = config.metadata["source"] + self.target = config.metadata["target"] + self.target = CSV(name=self.target["name"], path=self.target["path"]) + self.source = Iceberg(name=self.source["name"], catalog=self.source["catalog"]) logger.info( f""" IcebergToCSV initialised with diff --git a/local-data-platform/local_data_platform/pipeline/ingestion/bigquery_to_csv/__init__.py b/local-data-platform/local_data_platform/pipeline/ingestion/bigquery_to_csv/__init__.py new file mode 100644 index 0000000..1fc05d4 --- /dev/null +++ b/local-data-platform/local_data_platform/pipeline/ingestion/bigquery_to_csv/__init__.py @@ -0,0 +1,68 @@ +from local_data_platform.pipeline.ingestion import Ingestion +from local_data_platform.store.source.json import Json +from local_data_platform.store.source.gcp.bigquery import BigQuery, GCPCredentials +from local_data_platform.format.csv import CSV +from local_data_platform.logger import log +import os +from pathlib import Path + +logger = log() + + +class BigQueryToCSV(Ingestion): + """ + BigQueryToCSV is a class responsible for extracting data from Google BigQuery and saving it as a CSV file. + + Attributes: + source_config (dict): Configuration details for the data source. + target_config (dict): Configuration details for the data target. + credentials_path (Path): Path to the credentials file for Google Cloud Platform. + credentials (GCPCredentials): Credentials object for accessing Google Cloud Platform. + source (BigQuery): BigQuery object for interacting with Google BigQuery. + target_path (Path): Path where the CSV file will be saved. + target (CSV): CSV object for handling CSV file operations. + source_config_path (Path): Path to the source configuration file. + query (str): SQL query to be executed on the BigQuery source. + + Methods: + __init__(config, *args, **kwargs): Initializes the BigQueryToCSV object with the provided configuration. + extract(): Executes the query on the BigQuery source and returns the result. + """ + def __init__(self, config, *args, **kwargs): + self.source_config = config.metadata["source"] + print("source_config", self.source_config) + self.target_config = config.metadata["target"] + self.credentials_path = Path( + os.getcwd() + self.source_config["credentials"]["path"] + ) + self.credentials = GCPCredentials( + path=self.credentials_path, + kwargs=Json( + name=self.source_config["credentials"]["name"], + path=self.credentials_path, + ).get(), + ) + + self.source = BigQuery( + name=self.source_config["name"], + credentials=self.credentials, + path=self.source_config["path"], + ) + self.target_path = Path(os.getcwd() + self.target_config["path"]) + self.target = CSV(name=self.target_config["name"], path=self.target_path) + self.source_config_path = Path(os.getcwd() + self.source_config["path"]) + self.query = Json( + name=self.source_config["name"], path=self.source_config_path + ).get()["query"] + + logger.info( + f""" + BigQueryToCSV initialised with + source {self.source} + target {self.target} + """ + ) + super().__init__(config, *args, **kwargs) + + def extract(self): + return self.source.get(query=self.query) diff --git a/local-data-platform/local_data_platform/pipeline/ingestion/csv_to_iceberg/__init__.py b/local-data-platform/local_data_platform/pipeline/ingestion/csv_to_iceberg/__init__.py new file mode 100644 index 0000000..9965f7f --- /dev/null +++ b/local-data-platform/local_data_platform/pipeline/ingestion/csv_to_iceberg/__init__.py @@ -0,0 +1,51 @@ +from local_data_platform.pipeline.ingestion import Ingestion +from local_data_platform.format.csv import CSV +from local_data_platform.format.iceberg import Iceberg +from local_data_platform.logger import log + +logger = log() + +class CSVToIceberg(Ingestion): + """ + CSVToIceberg is a class responsible for ingesting data from a CSV source and + loading it into an Iceberg target. + + Attributes: + source (CSV): The source CSV configuration. + target (Iceberg): The target Iceberg configuration. + + Methods: + __init__(config, *args, **kwargs): + Initializes the CSVToIceberg instance with the provided configuration. + Logs the initialization details of the source and target. + + Args: + config (Config): Configuration object containing metadata for source and target. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + """ + def __init__(self, config, *args, **kwargs): + self.source = config.metadata['source'] + self.target = config.metadata['target'] + self.source = CSV( + name=self.source['name'], + path=self.source['path'] + ) + logger.info( + f""" + CSVToIceberg initialised with + source {self.source} + """ + ) + self.target = Iceberg( + name=self.target['name'], + catalog=self.target['catalog'] + ) + logger.info( + f""" + CSVToIceberg initialised with + source {self.source} + target {self.target} + """ + ) + super().__init__(config, *args, **kwargs) \ No newline at end of file diff --git a/local-data-platform/local_data_platform/pipeline/ingestion/parquet_to_iceberg/__init__.py b/local-data-platform/local_data_platform/pipeline/ingestion/parquet_to_iceberg/__init__.py index f9b58e8..ce626df 100644 --- a/local-data-platform/local_data_platform/pipeline/ingestion/parquet_to_iceberg/__init__.py +++ b/local-data-platform/local_data_platform/pipeline/ingestion/parquet_to_iceberg/__init__.py @@ -10,7 +10,21 @@ class ParquetToIceberg(Egression): + """ + ParquetToIceberg is a class responsible for transforming data from a Parquet source to an Iceberg target. + Attributes: + source (Parquet): The source Parquet configuration. + target (Iceberg): The target Iceberg configuration. + + Methods: + __init__(config: Config, *args, **kwargs): + Initializes the ParquetToIceberg instance with the provided configuration. + Args: + config (Config): Configuration object containing metadata for source and target. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + """ def __init__(self, config: Config, *args, **kwargs): self.source = config.metadata['source'] self.target = config.metadata['target'] diff --git a/local-data-platform/local_data_platform/pipeline/ingestion/pyarrow/__init__.py b/local-data-platform/local_data_platform/pipeline/ingestion/pyarrow/__init__.py index f2a2ae4..fec4510 100644 --- a/local-data-platform/local_data_platform/pipeline/ingestion/pyarrow/__init__.py +++ b/local-data-platform/local_data_platform/pipeline/ingestion/pyarrow/__init__.py @@ -9,7 +9,23 @@ class PyArrowLoader(Ingestion): + """ + PyArrowLoader is a class responsible for loading data using the PyArrow library. + Attributes: + config (Config): Configuration object containing settings for the loader. + + Methods: + __init__(config: Config, *args, **kwargs): + Initializes the PyArrowLoader with the given configuration. + + _extract() -> Table: + Extracts data from the source. + + load(): + Loads the extracted data into the target. + """ + def __init__(self, config: Config, *args, **kwargs): self.config = config diff --git a/local-data-platform/local_data_platform/source/gcp/gcp_bigquery/source_gcp_bigquery.py b/local-data-platform/local_data_platform/source/gcp/gcp_bigquery/source_gcp_bigquery.py deleted file mode 100644 index 75f6425..0000000 --- a/local-data-platform/local_data_platform/source/gcp/gcp_bigquery/source_gcp_bigquery.py +++ /dev/null @@ -1,23 +0,0 @@ -from source.gcp.gcp_connection import GCPBigQueryConnection - -class SourceGCPBigQuery: - """ - A class to interact with Google BigQuery. - client : GCPBigQueryConnection - An instance of GCPBigQueryConnection to interact with BigQuery. - """ - def __init__(self): - self.client = GCPBigQueryConnection('path/to/credentials.json', 'your-project-id') - - def fetch_data(self, query): - """ - Executes a given SQL query on Google BigQuery and returns the result as a pandas DataFrame. - - Args: - query (str): The SQL query to be executed. - - Returns: - pandas.DataFrame: The result of the query as a DataFrame. - """ - query_job = self.client.query(query) - return query_job.to_dataframe() \ No newline at end of file diff --git a/local-data-platform/local_data_platform/source/gcp/gcp_connection.py b/local-data-platform/local_data_platform/source/gcp/gcp_connection.py deleted file mode 100644 index d582e6a..0000000 --- a/local-data-platform/local_data_platform/source/gcp/gcp_connection.py +++ /dev/null @@ -1,60 +0,0 @@ -from google.cloud import bigquery -from google.oauth2 import service_account - -class GCPBigQueryConnection: - """ - GCPBigQueryConnection is a class that facilitates connection to Google Cloud Platform's BigQuery service using service account credentials. - - Attributes: - credentials_path (str): The file path to the service account credentials JSON file. - project_id (str): The GCP project ID. - - Methods: - __init__(credentials_path, project_id): - Initializes the GCPBigQueryConnection with the given credentials path and project ID. - - _create_client(): - - query(query_string): - Executes a SQL query on BigQuery and returns the results. - - - Example usage: - connection = GCPBigQueryConnection('/path/to/credentials.json', 'your-project-id') - results = connection.query('SELECT * FROM your_dataset.your_table') - for row in results: - print(row) - """ - def __init__(self, credentials_path, project_id): - self.credentials_path = credentials_path - self.project_id = project_id - self.client = self._create_client() - - def _create_client(self): - """ - Creates a BigQuery client using service account credentials. - - This method reads the service account credentials from a file specified - by `self.credentials_path` and uses them to create a BigQuery client - for the project specified by `self.project_id`. - - Returns: - bigquery.Client: An authenticated BigQuery client. - """ - credentials = service_account.Credentials.from_service_account_file(self.credentials_path) - client = bigquery.Client(credentials=credentials, project=self.project_id) - return client - - def query(self, query_string): - """ - Executes a SQL query on the GCP BigQuery client and returns the results. - - Args: - query_string (str): The SQL query to be executed. - - Returns: - google.cloud.bigquery.table.RowIterator: An iterator over the rows in the query result. - """ - query_job = self.client.query(query_string) - results = query_job.result() - return results diff --git a/local-data-platform/local_data_platform/source/near/near_transactions.sql b/local-data-platform/local_data_platform/source/near/near_transactions.sql deleted file mode 100644 index 021b89c..0000000 --- a/local-data-platform/local_data_platform/source/near/near_transactions.sql +++ /dev/null @@ -1,3 +0,0 @@ -SELECT * -FROM `bigquery-public-data.crypto_near.transactions` -LIMIT 1000; \ No newline at end of file diff --git a/local-data-platform/local_data_platform/source/near/source_near_gcp.py b/local-data-platform/local_data_platform/source/near/source_near_gcp.py deleted file mode 100644 index c11068d..0000000 --- a/local-data-platform/local_data_platform/source/near/source_near_gcp.py +++ /dev/null @@ -1,50 +0,0 @@ -from local_data_platform.source.gcp.gcp_bigquery.source_gcp_bigquery import SourceGCPBigQuery -""" -This module defines the Source_Near_GCP class, which inherits from SourceGCPBigQuery -and provides functionality to fetch transaction data from the NEAR protocol on Google Cloud Platform. - -Classes: - Source_Near_GCP: A class to fetch NEAR protocol transaction data from GCP BigQuery. - -Methods: - fetch_near_transaction(self): - Fetches NEAR protocol transaction data from the previous day. - Returns: - DataFrame: A pandas DataFrame containing the transaction data. -""" -class Source_Near_GCP(SourceGCPBigQuery): - """ - A class to represent the data source for Near transactions from Google Cloud Platform (GCP) BigQuery. - Methods - ------- - fetch_near_transaction(): - Reads an SQL query from a file and fetches Near transaction data from GCP BigQuery. - """ - """ - Reads an SQL query from a specified file and fetches Near transaction data from GCP BigQuery. - The method reads the SQL query from 'local_data_platform/source/near/queries/near_transactions.sql', - prints the query for debugging purposes, and then uses the query to fetch data from BigQuery. - Returns - ------- - DataFrame - A DataFrame containing the fetched Near transaction data. - """ - def fetch_near_transaction(self): - - def read_query_from_file(file_path): - with open(file_path, 'r') as file: - query = file.read() - return query - - # Path to the configuration file - query_file_path = 'local_data_platform/source/near/queries/near_transactions.sql' - - # Read the query from the file - query = read_query_from_file(query_file_path) - - print("SQL QUERY: ",query) - - return self.fetch_data(query) - - - \ No newline at end of file diff --git a/local-data-platform/local_data_platform/store/source/gcp/__init__.py b/local-data-platform/local_data_platform/store/source/gcp/__init__.py index e69de29..6e6f731 100644 --- a/local-data-platform/local_data_platform/store/source/gcp/__init__.py +++ b/local-data-platform/local_data_platform/store/source/gcp/__init__.py @@ -0,0 +1,47 @@ +from local_data_platform.store.source import Source +from pathlib import Path +import json +from local_data_platform import Credentials +from local_data_platform import logger + +logger = log() + +class GCPCredentials(Credentials): + def __init__(self, path, kwargs): + """ + todo: + path, + type, + project_id, + private_key_id, + private_key, + client_email, + client_id, + auth_uri, + token_uri, + auth_provider_x509_cert_url, + client_x509_cert_url, + universe_domain + """ + + self.path = path + self.project_id = kwargs.get("project_id") + logger.info(f"GCPCredentials initialised with {kwargs}") + super().__init__(path=self.path, project_id=self.project_id) + + def get_project_id(self): + with open(self.path, "r") as file: + data = json.load(file) + return data.get("project_id") + + +class GCP(Source): + """ + A base class for Source Store implementation + """ + + def __init__(self, name: str, path: Path): + self.name = name + self.path = path + logger.info(f"GCP initialised with {self.path}") + super().__init__(self.name, self.path) diff --git a/local-data-platform/local_data_platform/store/source/gcp/bigquery/__init__.py b/local-data-platform/local_data_platform/store/source/gcp/bigquery/__init__.py new file mode 100644 index 0000000..20096bc --- /dev/null +++ b/local-data-platform/local_data_platform/store/source/gcp/bigquery/__init__.py @@ -0,0 +1,52 @@ +import os +from local_data_platform.store.source.gcp import GCP, GCPCredentials +from google.cloud import bigquery +from google.oauth2 import service_account +from local_data_platform.format.csv import CSV +from pathlib import Path +from local_data_platform.logger import log +from pandas import DataFrame as Dataframe +from pyarrow import Table as Table + +logger = log() + + +class BigQuery(GCP): + """ + A base class for BigQuery Store implementation + """ + + def __init__(self, name: str, credentials: GCPCredentials, path: Path): + self.name = name + self.credentials = credentials + self.project_id = self.credentials.project_id + self.path = path + self.client = self._get_bigquery_client() + logger.info(f"BigQuery initialised with {self.path}") + super().__init__(self.name, self.path) + + def get(self, query: str) -> Table: + logger.info(f"Getting data from BigQuery {self.name}") + logger.info(f"Query: {query}") + try: + logger.info(f"query_job: {self.client.query(query)}") + query_job = self.client.query(query) + df = query_job.to_dataframe() + print({type(df)}) + except Exception as e: + logger.error(f"Error getting data from BigQuery {self.name}") + logger.error(e) + raise e + logger.info(f"Data from BigQuery {self.name} fetched successfully") + logger.info(f"Data: {len(df)} type: {type(df)}") + return Table.from_pandas(df) + + def _get_bigquery_client(self) -> bigquery.Client: + credentials = service_account.Credentials.from_service_account_file( + self.credentials.path + ) + client = bigquery.Client(credentials=credentials, project=self.project_id) + return client + + def put(self): + pass diff --git a/local-data-platform/local_data_platform/store/source/gcp/gcp_bigquery/__init__.py b/local-data-platform/local_data_platform/store/source/gcp/gcp_bigquery/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/local-data-platform/local_data_platform/store/source/json/__init__.py b/local-data-platform/local_data_platform/store/source/json/__init__.py index 60893bf..a16ca43 100644 --- a/local-data-platform/local_data_platform/store/source/json/__init__.py +++ b/local-data-platform/local_data_platform/store/source/json/__init__.py @@ -1,11 +1,17 @@ import json from local_data_platform.store.source import Source +from local_data_platform.logger import log + +logger = log() + class Json(Source): def __init__(self, *args, **kwargs): + logger.info(f"Json initialised with {kwargs}") super().__init__(*args, **kwargs) def get(self): - with open(self.path, 'r') as file: - return json.load(file) \ No newline at end of file + logger.info(f"Reading Json with path {self.path}") + with open(self.path, "r") as file: + return json.load(file) diff --git a/local-data-platform/local_data_platform/store/target/iceberg/__init__.py b/local-data-platform/local_data_platform/store/target/iceberg/__init__.py index 756f676..cf49812 100644 --- a/local-data-platform/local_data_platform/store/target/iceberg/__init__.py +++ b/local-data-platform/local_data_platform/store/target/iceberg/__init__.py @@ -3,15 +3,17 @@ from pyiceberg.schema import Schema from pyiceberg.typedef import Identifier from pyiceberg.table import Table +from local_data_platform import logger + +logger = log() + class Iceberg(Target): def __init__(self, catalog: str, *args, **kwargs): self.catalog = LocalIcebergCatalog( - catalog['identifier'], - path=catalog['warehouse_path'] + catalog["identifier"], path=catalog["warehouse_path"] ) - self.identifier = f"{catalog['identifier']}.{kwargs['name']}" self.path = kwargs["path"] self.format = kwargs["format"] @@ -19,8 +21,10 @@ def __init__(self, catalog: str, *args, **kwargs): super().__init__(*args, **kwargs) def put(self, schema: Schema) -> Table: - print(f"self.identifier {self.identifier}") - return self.catalog.create_table_if_not_exists(identifier=self.identifier, schema=schema) + logger.info(f"self.identifier {self.identifier}") + return self.catalog.create_table_if_not_exists( + identifier=self.identifier, schema=schema + ) def get_10_rows(self, catalog, name): pass diff --git a/local-data-platform/poetry.lock b/local-data-platform/poetry.lock index 2c50362..d673255 100644 --- a/local-data-platform/poetry.lock +++ b/local-data-platform/poetry.lock @@ -178,6 +178,23 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "db-dtypes" +version = "1.3.0" +description = "Pandas Data Types for SQL systems (BigQuery, Spanner)" +optional = false +python-versions = ">=3.7" +files = [ + {file = "db_dtypes-1.3.0-py2.py3-none-any.whl", hash = "sha256:7e65c59f849ccbe6f7bc4d0253edcc212a7907662906921caba3e4aadd0bc277"}, + {file = "db_dtypes-1.3.0.tar.gz", hash = "sha256:7bcbc8858b07474dc85b77bb2f3ae488978d1336f5ea73b58c39d9118bc3e91b"}, +] + +[package.dependencies] +numpy = ">=1.16.6" +packaging = ">=17.0" +pandas = ">=0.24.2" +pyarrow = ">=3.0.0" + [[package]] name = "exceptiongroup" version = "1.2.2" @@ -260,12 +277,12 @@ files = [ google-auth = ">=2.14.1,<3.0.dev0" googleapis-common-protos = ">=1.56.2,<2.0.dev0" grpcio = [ - {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, {version = ">=1.33.2,<2.0dev", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, ] grpcio-status = [ - {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, {version = ">=1.33.2,<2.0.dev0", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, ] proto-plus = ">=1.22.3,<2.0.0dev" protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<6.0.0.dev0" @@ -749,6 +766,92 @@ files = [ {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, ] +[[package]] +name = "pandas" +version = "2.2.3" +description = "Powerful data structures for data analysis, time series, and statistics" +optional = false +python-versions = ">=3.9" +files = [ + {file = "pandas-2.2.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:1948ddde24197a0f7add2bdc4ca83bf2b1ef84a1bc8ccffd95eda17fd836ecb5"}, + {file = "pandas-2.2.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:381175499d3802cde0eabbaf6324cce0c4f5d52ca6f8c377c29ad442f50f6348"}, + {file = "pandas-2.2.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:d9c45366def9a3dd85a6454c0e7908f2b3b8e9c138f5dc38fed7ce720d8453ed"}, + {file = "pandas-2.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:86976a1c5b25ae3f8ccae3a5306e443569ee3c3faf444dfd0f41cda24667ad57"}, + {file = "pandas-2.2.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:b8661b0238a69d7aafe156b7fa86c44b881387509653fdf857bebc5e4008ad42"}, + {file = "pandas-2.2.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:37e0aced3e8f539eccf2e099f65cdb9c8aa85109b0be6e93e2baff94264bdc6f"}, + {file = "pandas-2.2.3-cp310-cp310-win_amd64.whl", hash = "sha256:56534ce0746a58afaf7942ba4863e0ef81c9c50d3f0ae93e9497d6a41a057645"}, + {file = "pandas-2.2.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:66108071e1b935240e74525006034333f98bcdb87ea116de573a6a0dccb6c039"}, + {file = "pandas-2.2.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7c2875855b0ff77b2a64a0365e24455d9990730d6431b9e0ee18ad8acee13dbd"}, + {file = "pandas-2.2.3-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd8d0c3be0515c12fed0bdbae072551c8b54b7192c7b1fda0ba56059a0179698"}, + {file = "pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c124333816c3a9b03fbeef3a9f230ba9a737e9e5bb4060aa2107a86cc0a497fc"}, + {file = "pandas-2.2.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:63cc132e40a2e084cf01adf0775b15ac515ba905d7dcca47e9a251819c575ef3"}, + {file = "pandas-2.2.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:29401dbfa9ad77319367d36940cd8a0b3a11aba16063e39632d98b0e931ddf32"}, + {file = "pandas-2.2.3-cp311-cp311-win_amd64.whl", hash = "sha256:3fc6873a41186404dad67245896a6e440baacc92f5b716ccd1bc9ed2995ab2c5"}, + {file = "pandas-2.2.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:b1d432e8d08679a40e2a6d8b2f9770a5c21793a6f9f47fdd52c5ce1948a5a8a9"}, + {file = "pandas-2.2.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a5a1595fe639f5988ba6a8e5bc9649af3baf26df3998a0abe56c02609392e0a4"}, + {file = "pandas-2.2.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:5de54125a92bb4d1c051c0659e6fcb75256bf799a732a87184e5ea503965bce3"}, + {file = "pandas-2.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fffb8ae78d8af97f849404f21411c95062db1496aeb3e56f146f0355c9989319"}, + {file = "pandas-2.2.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:6dfcb5ee8d4d50c06a51c2fffa6cff6272098ad6540aed1a76d15fb9318194d8"}, + {file = "pandas-2.2.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:062309c1b9ea12a50e8ce661145c6aab431b1e99530d3cd60640e255778bd43a"}, + {file = "pandas-2.2.3-cp312-cp312-win_amd64.whl", hash = "sha256:59ef3764d0fe818125a5097d2ae867ca3fa64df032331b7e0917cf5d7bf66b13"}, + {file = "pandas-2.2.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f00d1345d84d8c86a63e476bb4955e46458b304b9575dcf71102b5c705320015"}, + {file = "pandas-2.2.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:3508d914817e153ad359d7e069d752cdd736a247c322d932eb89e6bc84217f28"}, + {file = "pandas-2.2.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:22a9d949bfc9a502d320aa04e5d02feab689d61da4e7764b62c30b991c42c5f0"}, + {file = "pandas-2.2.3-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f3a255b2c19987fbbe62a9dfd6cff7ff2aa9ccab3fc75218fd4b7530f01efa24"}, + {file = "pandas-2.2.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:800250ecdadb6d9c78eae4990da62743b857b470883fa27f652db8bdde7f6659"}, + {file = "pandas-2.2.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6374c452ff3ec675a8f46fd9ab25c4ad0ba590b71cf0656f8b6daa5202bca3fb"}, + {file = "pandas-2.2.3-cp313-cp313-win_amd64.whl", hash = "sha256:61c5ad4043f791b61dd4752191d9f07f0ae412515d59ba8f005832a532f8736d"}, + {file = "pandas-2.2.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:3b71f27954685ee685317063bf13c7709a7ba74fc996b84fc6821c59b0f06468"}, + {file = "pandas-2.2.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:38cf8125c40dae9d5acc10fa66af8ea6fdf760b2714ee482ca691fc66e6fcb18"}, + {file = "pandas-2.2.3-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ba96630bc17c875161df3818780af30e43be9b166ce51c9a18c1feae342906c2"}, + {file = "pandas-2.2.3-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1db71525a1538b30142094edb9adc10be3f3e176748cd7acc2240c2f2e5aa3a4"}, + {file = "pandas-2.2.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:15c0e1e02e93116177d29ff83e8b1619c93ddc9c49083f237d4312337a61165d"}, + {file = "pandas-2.2.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:ad5b65698ab28ed8d7f18790a0dc58005c7629f227be9ecc1072aa74c0c1d43a"}, + {file = "pandas-2.2.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:bc6b93f9b966093cb0fd62ff1a7e4c09e6d546ad7c1de191767baffc57628f39"}, + {file = "pandas-2.2.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:5dbca4c1acd72e8eeef4753eeca07de9b1db4f398669d5994086f788a5d7cc30"}, + {file = "pandas-2.2.3-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:8cd6d7cc958a3910f934ea8dbdf17b2364827bb4dafc38ce6eef6bb3d65ff09c"}, + {file = "pandas-2.2.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:99df71520d25fade9db7c1076ac94eb994f4d2673ef2aa2e86ee039b6746d20c"}, + {file = "pandas-2.2.3-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:31d0ced62d4ea3e231a9f228366919a5ea0b07440d9d4dac345376fd8e1477ea"}, + {file = "pandas-2.2.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:7eee9e7cea6adf3e3d24e304ac6b8300646e2a5d1cd3a3c2abed9101b0846761"}, + {file = "pandas-2.2.3-cp39-cp39-win_amd64.whl", hash = "sha256:4850ba03528b6dd51d6c5d273c46f183f39a9baf3f0143e566b89450965b105e"}, + {file = "pandas-2.2.3.tar.gz", hash = "sha256:4f18ba62b61d7e192368b84517265a99b4d7ee8912f8708660fb4a366cc82667"}, +] + +[package.dependencies] +numpy = [ + {version = ">=1.22.4", markers = "python_version < \"3.11\""}, + {version = ">=1.23.2", markers = "python_version == \"3.11\""}, + {version = ">=1.26.0", markers = "python_version >= \"3.12\""}, +] +python-dateutil = ">=2.8.2" +pytz = ">=2020.1" +tzdata = ">=2022.7" + +[package.extras] +all = ["PyQt5 (>=5.15.9)", "SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "adbc-driver-sqlite (>=0.8.0)", "beautifulsoup4 (>=4.11.2)", "bottleneck (>=1.3.6)", "dataframe-api-compat (>=0.1.7)", "fastparquet (>=2022.12.0)", "fsspec (>=2022.11.0)", "gcsfs (>=2022.11.0)", "html5lib (>=1.1)", "hypothesis (>=6.46.1)", "jinja2 (>=3.1.2)", "lxml (>=4.9.2)", "matplotlib (>=3.6.3)", "numba (>=0.56.4)", "numexpr (>=2.8.4)", "odfpy (>=1.4.1)", "openpyxl (>=3.1.0)", "pandas-gbq (>=0.19.0)", "psycopg2 (>=2.9.6)", "pyarrow (>=10.0.1)", "pymysql (>=1.0.2)", "pyreadstat (>=1.2.0)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)", "python-calamine (>=0.1.7)", "pyxlsb (>=1.0.10)", "qtpy (>=2.3.0)", "s3fs (>=2022.11.0)", "scipy (>=1.10.0)", "tables (>=3.8.0)", "tabulate (>=0.9.0)", "xarray (>=2022.12.0)", "xlrd (>=2.0.1)", "xlsxwriter (>=3.0.5)", "zstandard (>=0.19.0)"] +aws = ["s3fs (>=2022.11.0)"] +clipboard = ["PyQt5 (>=5.15.9)", "qtpy (>=2.3.0)"] +compression = ["zstandard (>=0.19.0)"] +computation = ["scipy (>=1.10.0)", "xarray (>=2022.12.0)"] +consortium-standard = ["dataframe-api-compat (>=0.1.7)"] +excel = ["odfpy (>=1.4.1)", "openpyxl (>=3.1.0)", "python-calamine (>=0.1.7)", "pyxlsb (>=1.0.10)", "xlrd (>=2.0.1)", "xlsxwriter (>=3.0.5)"] +feather = ["pyarrow (>=10.0.1)"] +fss = ["fsspec (>=2022.11.0)"] +gcp = ["gcsfs (>=2022.11.0)", "pandas-gbq (>=0.19.0)"] +hdf5 = ["tables (>=3.8.0)"] +html = ["beautifulsoup4 (>=4.11.2)", "html5lib (>=1.1)", "lxml (>=4.9.2)"] +mysql = ["SQLAlchemy (>=2.0.0)", "pymysql (>=1.0.2)"] +output-formatting = ["jinja2 (>=3.1.2)", "tabulate (>=0.9.0)"] +parquet = ["pyarrow (>=10.0.1)"] +performance = ["bottleneck (>=1.3.6)", "numba (>=0.56.4)", "numexpr (>=2.8.4)"] +plot = ["matplotlib (>=3.6.3)"] +postgresql = ["SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "psycopg2 (>=2.9.6)"] +pyarrow = ["pyarrow (>=10.0.1)"] +spss = ["pyreadstat (>=1.2.0)"] +sql-other = ["SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "adbc-driver-sqlite (>=0.8.0)"] +test = ["hypothesis (>=6.46.1)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)"] +xml = ["lxml (>=4.9.2)"] + [[package]] name = "pluggy" version = "1.5.0" @@ -892,8 +995,8 @@ files = [ annotated-types = ">=0.6.0" pydantic-core = "2.23.4" typing-extensions = [ - {version = ">=4.12.2", markers = "python_version >= \"3.13\""}, {version = ">=4.6.1", markers = "python_version < \"3.13\""}, + {version = ">=4.12.2", markers = "python_version >= \"3.13\""}, ] [package.extras] @@ -1143,6 +1246,17 @@ files = [ [package.dependencies] six = ">=1.5" +[[package]] +name = "pytz" +version = "2024.2" +description = "World timezone definitions, modern and historical" +optional = false +python-versions = "*" +files = [ + {file = "pytz-2024.2-py2.py3-none-any.whl", hash = "sha256:31c7c1817eb7fae7ca4b8c7ee50c72f93aa2dd863de768e1ef4245d426aa0725"}, + {file = "pytz-2024.2.tar.gz", hash = "sha256:2aa355083c50a0f93fa581709deac0c9ad65cca8a9e9beac660adcbd493c798a"}, +] + [[package]] name = "requests" version = "2.32.3" @@ -1281,6 +1395,17 @@ files = [ {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, ] +[[package]] +name = "tzdata" +version = "2024.2" +description = "Provider of IANA time zone data" +optional = false +python-versions = ">=2" +files = [ + {file = "tzdata-2024.2-py2.py3-none-any.whl", hash = "sha256:a48093786cdcde33cad18c2555e8532f34422074448fbc874186f0abd79565cd"}, + {file = "tzdata-2024.2.tar.gz", hash = "sha256:7d85cc416e9382e69095b7bdf4afd9e3880418a2413feec7069d533d6b4e31cc"}, +] + [[package]] name = "urllib3" version = "2.2.3" @@ -1301,4 +1426,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "152d6680613e2bd31143e972171c106949377d8693e3c226bce4cdc69e5e2baa" +content-hash = "d3064463c82a5efb8177adfa80140575fc5ac37773a1d4e46517306fc6817d89" diff --git a/local-data-platform/pyproject.toml b/local-data-platform/pyproject.toml index d4d17fc..800afe2 100644 --- a/local-data-platform/pyproject.toml +++ b/local-data-platform/pyproject.toml @@ -14,6 +14,8 @@ pytest = "^8.3.3" magicmock = "^0.3" google = "^3.0.0" google-cloud-bigquery = "^3.26.0" +pandas = "^2.2.3" +db-dtypes = "^1.3.0" [build-system] diff --git a/local-data-platform/real_world_use_cases/near_data_lake/config/egression.json b/local-data-platform/real_world_use_cases/near_data_lake/config/egression.json new file mode 100644 index 0000000..caeff78 --- /dev/null +++ b/local-data-platform/real_world_use_cases/near_data_lake/config/egression.json @@ -0,0 +1,26 @@ + +{ + "identifier": "near_transactions", + "who": "investor", + "what": "gain", + "where": "near", + "when": "trade", + "how": "gcp", + "metadata": { + "dataset": "near_transactions", + "target": { + "name": "transactions", + "format": "ICEBERG", + "catalog": { + "type": "LocalIceberg", + "identifier": "near_transactions", + "warehouse_path": "real_world_use_cases/near_data_lake/data" + } + }, + "source": { + "name": "near_transactions", + "format": "CSV", + "path": "/real_world_use_cases/near_data_lake/near_transactions.csv" + } + } +} \ No newline at end of file diff --git a/local-data-platform/real_world_use_cases/near_data_lake/config/ingestion.json b/local-data-platform/real_world_use_cases/near_data_lake/config/ingestion.json new file mode 100644 index 0000000..4fb002b --- /dev/null +++ b/local-data-platform/real_world_use_cases/near_data_lake/config/ingestion.json @@ -0,0 +1,27 @@ + +{ + "identifier": "near_transactions", + "who": "investor", + "what": "gain", + "where": "near", + "when": "trade", + "how": "gcp", + "metadata": { + "dataset": "near_transactions", + "source": { + "name": "transaction_query", + "format": "JSON", + "path": "/real_world_use_cases/near_data_lake/config/sample_queries/near_transaction.json", + "engine": "BIGQUERY", + "credentials": { + "name":"GCP", + "path": "/../samples/creadentials.json" + } + }, + "target": { + "name": "near_transactions", + "format": "CSV", + "path": "/real_world_use_cases/near_data_lake/near_transactions.csv" + } + } +} \ No newline at end of file diff --git a/local-data-platform/real_world_use_cases/near_data_lake/config/sample_queries/near_transaction.json b/local-data-platform/real_world_use_cases/near_data_lake/config/sample_queries/near_transaction.json new file mode 100644 index 0000000..b77baf0 --- /dev/null +++ b/local-data-platform/real_world_use_cases/near_data_lake/config/sample_queries/near_transaction.json @@ -0,0 +1,4 @@ +{ + "query":"SELECT * FROM `bigquery-public-data.crypto_near_mainnet_us.transactions` WHERE block_date = '2024-09-30' LIMIT 5", + "desc":"Get the first 5 transactions from the Near Protocol Mainnet for the date 2024-09-30" +} diff --git a/local-data-platform/real_world_use_cases/near_data_lake/monthly_reporting.py b/local-data-platform/real_world_use_cases/near_data_lake/monthly_reporting.py new file mode 100644 index 0000000..00bac1e --- /dev/null +++ b/local-data-platform/real_world_use_cases/near_data_lake/monthly_reporting.py @@ -0,0 +1,109 @@ +from local_data_platform.pipeline.egression.csv_to_iceberg import CSVToIceberg +from local_data_platform.pipeline.ingestion.bigquery_to_csv import BigQueryToCSV +from local_data_platform import Config, SupportedFormat, SupportedEngine +from local_data_platform.store.source.json import Json +from local_data_platform.exceptions import PipelineNotFound +import os +from local_data_platform.logger import log + + +logger = log() + + +def get_near_trasaction_dataset( + dataset="near_transactions", + config_path="/real_world_use_cases/near_data_lake/config/ingestion.json", +): + """ + Retrieves and processes the near transaction dataset based on the provided configuration. + + Args: + dataset (str): The name of the dataset to be processed. Defaults to "near_transactions". + config_path (str): The path to the configuration file. Defaults to "/real_world_use_cases/near_data_lake/config/ingestion.json". + + Raises: + PipelineNotFound: If the source and target formats specified in the configuration are not supported. + + Returns: + None + """ + + config = Config( + **Json( + name=dataset, + path=os.getcwd() + config_path, + ).get() + ) + print(config) + logger.info( + f""" + We are using the following dictionary as the configuration to generate a monthly trust metric + {config} + """ + ) + if ( + config.metadata["source"]["format"] == SupportedFormat.JSON.value + and config.metadata["target"]["format"] == SupportedFormat.CSV.value + and config.metadata["source"]["engine"] == SupportedEngine.BIGQUERY.value + ): + data_loader = BigQueryToCSV(config=config) + data_loader.load() + else: + raise PipelineNotFound( + f""" + source {config.metadata['source']['format']} + to target {config.metadata['target']['format']} + pipeline is not supported yet + """ + ) + + +def put_near_trasaction_dataset( + dataset="near_transactions", + config_path="/real_world_use_cases/near_data_lake/config/egression.json", +): + """ + Loads and processes a dataset based on the provided configuration. + + This function reads a configuration file and uses it to load and process + a dataset. It currently supports loading data from CSV format and converting + it to Iceberg format. + + Args: + dataset (str): The name of the dataset to be processed. Defaults to "near_transactions". + config_path (str): The path to the configuration file. Defaults to "/real_world_use_cases/near_data_lake/config/egression.json". + + Raises: + PipelineNotFound: If the source and target formats specified in the configuration are not supported. + """ + config = Config( + **Json( + name=dataset, + path=os.getcwd() + config_path, + ).get() + ) + + logger.info( + f""" + We are using the following dictionary as the configuration to generate a monthly trust metric + {config} + """ + ) + if ( + config.metadata["source"]["format"] == SupportedFormat.CSV.value + and config.metadata["target"]["format"] == SupportedFormat.ICEBERG.value + ): + data_loader = CSVToIceberg(config=config) + data_loader.load() + else: + raise PipelineNotFound( + f""" + source {config.metadata['source']['format']} + to target {config.metadata['target']['format']} + pipeline is not supported yet + """ + ) + + +# get_near_trasaction_dataset(); +put_near_trasaction_dataset() diff --git a/local-data-platform/real_world_use_cases/near_data_lake/near_transactions.csv b/local-data-platform/real_world_use_cases/near_data_lake/near_transactions.csv new file mode 100644 index 0000000..f068f17 --- /dev/null +++ b/local-data-platform/real_world_use_cases/near_data_lake/near_transactions.csv @@ -0,0 +1,6 @@ +"block_date","block_height","block_timestamp","block_timestamp_utc","block_hash","chunk_hash","shard_id","transaction_hash","index_in_chunk","signer_account_id","signer_public_key","nonce","receiver_account_id","signature","status","converted_into_receipt_id","receipt_conversion_gas_burnt","receipt_conversion_tokens_burnt" +2024-09-30,129196288,1727654565340414465,2024-09-30 00:02:45.340414Z,"FWWUGnh8TuY3WBR41QxK57gzKGLeKwXgLXyq7RAhR9Mv","2Fn2qKWNtv9XECDfkeZX7LCQuH2unDHCjPwzYQNUmYKY",5,"HVVVZ4VQ5iV1aE2Y98Tivif4oMx2tSAjWtYfDkJfjzff",0,"xjumprewards.near","ed25519:ARGYRDF8zG39sUmtWJA2j765CLAaBdrKSUPBcboH5mcf",84292004028861,"jumptoken.jumpfinance.near","ed25519:58kUPiNpC4WwQhPb3rHAe6NeGXAEPQ9CgDjWCJBEe5uXyEtV1ZMLtjW8NH8WqrCBDAfJQoF2fBX8pWVnidEdcv29","SUCCESS_RECEIPT_ID","cQtN58nyFoHfRorZTcwG98Z4BPCGyq8PUwaXFn4W2jE",314782903815,3.14782903815e+19 +2024-09-30,129196288,1727654565340414465,2024-09-30 00:02:45.340414Z,"FWWUGnh8TuY3WBR41QxK57gzKGLeKwXgLXyq7RAhR9Mv","2Fn2qKWNtv9XECDfkeZX7LCQuH2unDHCjPwzYQNUmYKY",5,"4u8ufA9EbnN3wTFmg7WY6WrqdqANDQSu9hduQxxPfJn3",2,"watcher01.ref-watchdog.near","ed25519:ErD2sZ8PFtVUemwFnwb9pQGaUswP5KfCd7fSs8hYMQpF",67406441334296,"v2.ref-finance.near","ed25519:ADtNtsf8qM2XKNyXGzZwPLwvTEBzPq76BSzwWJx9T6QCuePE9YPxrAEBTKdTkvo8LRSQjsmnfuLPUwY6eKBrUE4","SUCCESS_RECEIPT_ID","FbKcAVYUgUDFVFUNQxoVLYqW5bJMTuM7jcLgjSBwpbHk",312446401780,3.1244640178e+19 +2024-09-30,129196288,1727654565340414465,2024-09-30 00:02:45.340414Z,"FWWUGnh8TuY3WBR41QxK57gzKGLeKwXgLXyq7RAhR9Mv","14H8iNpT7vhfVC1irRLe2upBbxyBC7gQDA1hdt6cKTub",4,"DYfq3vhYkTvPAq7BWHsU1akqZ7VYYj9U24XJsFezNk5w",0,"relay.tg","ed25519:4RPTTPDuAcozcnfZ7v2URdrALqDRtqC2mnaTeD7bQeiL",111733455606416,"7433538543.tg","ed25519:4JXwk93dKtiSG6qxDkJUt1gPywqRD3jqL1twnfKisGThgk5pynRcPkLnSVx89ns7qtf77BAG6H7uyWw5UyKMgpnh","SUCCESS_RECEIPT_ID","7JQyJ3cvspL1SrvNi7DtCv1T8Xfi2GX21diBk1w7Erk9",515831945545,5.15831945545e+19 +2024-09-30,129196288,1727654565340414465,2024-09-30 00:02:45.340414Z,"FWWUGnh8TuY3WBR41QxK57gzKGLeKwXgLXyq7RAhR9Mv","14H8iNpT7vhfVC1irRLe2upBbxyBC7gQDA1hdt6cKTub",4,"ArVghB3QBXx4VjwiS75paniFCqaggdBmLFmC1HuiootY",1,"relay.tg","ed25519:to5svbU1SbheQ7br8kAnFbacj8Cf8GbDFzKg1VSrNwC",111733535278092,"i6407175978.tg","ed25519:51kYNScHcT6Ge6ZhzSRoPWaihjZjgPDhP4xnfv57fMuxpCcQxJUmrfZAPTM2cuuAfbLBhDPQw8b3yxfLbQdvAyKN","SUCCESS_RECEIPT_ID","EB8ku6PyNci3NpmCUHRtH76PMTNwVFrJCvumUgiou1zv",516594884985,5.16594884985e+19 +2024-09-30,129196288,1727654565340414465,2024-09-30 00:02:45.340414Z,"FWWUGnh8TuY3WBR41QxK57gzKGLeKwXgLXyq7RAhR9Mv","14H8iNpT7vhfVC1irRLe2upBbxyBC7gQDA1hdt6cKTub",4,"5A73awb5wvNBvPGxHQqQE6bTUFSnowiVBs3DyQVr5xwY",2,"tg","ed25519:3PEruYpXH8KbhZEWS2rFTNhQ211De3hzapV6ViVeNLWu",116159540145408,"tg","ed25519:3GJb2hfvzcN3ijXMtZGqueApPA7onHygsJUMQEZv4TEwaT8AgSxBgJDHzMsFsdXKFmEw2GVvyQEJCYYyb4rLRcX5","SUCCESS_RECEIPT_ID","7tYgNQKd1UVdv37DNxUzVq2utNe7YHNiQQjLWbSs4h9V",308314396476,3.08314396476e+19 diff --git a/local-data-platform/tests/test_gcp_connection.py b/local-data-platform/tests/test_gcp_connection.py index e082ab2..976706b 100644 --- a/local-data-platform/tests/test_gcp_connection.py +++ b/local-data-platform/tests/test_gcp_connection.py @@ -2,29 +2,36 @@ import tempfile import json from unittest.mock import patch, MagicMock -from local_data_platform.source.gcp.gcp_connection import GCPBigQueryConnection +from local_data_platform.store.source.gcp.bigquery import GCPBigQueryConnection + @pytest.fixture def mock_bigquery_client(): - with patch('local_data_platform.source.gcp.gcp_connection.bigquery.Client') as mock_client: + with patch( + "local_data_platform.store.source.gcp.bigquery.bigquery.Client" + ) as mock_client: yield mock_client + @pytest.fixture def mock_service_account_credentials(): - with patch('local_data_platform.source.gcp.gcp_connection.service_account.Credentials') as mock_credentials: + with patch( + "local_data_platform.store.source.gcp.bigquery.service_account.Credentials" + ) as mock_credentials: yield mock_credentials + @pytest.fixture def temp_credentials_file(): """ -Fixture that creates a temporary JSON file containing Google Cloud Platform (GCP) service account credentials. + Fixture that creates a temporary JSON file containing Google Cloud Platform (GCP) service account credentials. -This fixture generates a temporary file with mock GCP service account credentials and yields the file path. -The file is not deleted automatically after the test, allowing for inspection if needed. + This fixture generates a temporary file with mock GCP service account credentials and yields the file path. + The file is not deleted automatically after the test, allowing for inspection if needed. -Returns: - str: The file path to the temporary credentials JSON file. -""" + Returns: + str: The file path to the temporary credentials JSON file. + """ credentials_data = { "type": "service_account", "project_id": "your-project-id", @@ -35,14 +42,19 @@ def temp_credentials_file(): "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/your-service-account-email%40your-project-id.iam.gserviceaccount.com" + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/your-service-account-email%40your-project-id.iam.gserviceaccount.com", } - with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.json') as temp_file: + with tempfile.NamedTemporaryFile( + delete=False, mode="w", suffix=".json" + ) as temp_file: json.dump(credentials_data, temp_file) temp_file_path = temp_file.name yield temp_file_path -def test_query(mock_service_account_credentials, mock_bigquery_client, temp_credentials_file): + +def test_query( + mock_service_account_credentials, mock_bigquery_client, temp_credentials_file +): """ Test the `query` method of the `GCPBigQueryConnection` class. @@ -65,24 +77,24 @@ def test_query(mock_service_account_credentials, mock_bigquery_client, temp_cred pytest tests/test_gcp_connection.py """ # Mock the credentials and client - mock_service_account_credentials.from_service_account_file.return_value = MagicMock() + mock_service_account_credentials.from_service_account_file.return_value = ( + MagicMock() + ) mock_client_instance = mock_bigquery_client.return_value mock_query_job = MagicMock() - mock_query_job.result.return_value = [ - {'column1': 'value1', 'column2': 'value2'} - ] + mock_query_job.result.return_value = [{"column1": "value1", "column2": "value2"}] mock_client_instance.query.return_value = mock_query_job # Initialize the GCPBigQueryConnection - connection = GCPBigQueryConnection(temp_credentials_file, 'your-project-id') + connection = GCPBigQueryConnection(temp_credentials_file, "your-project-id") # Execute the query method - result = connection.query('SELECT * FROM dataset.table') + result = connection.query("SELECT * FROM dataset.table") # Verify the results assert len(result) == 1 - assert result[0]['column1'] == 'value1' - assert result[0]['column2'] == 'value2' + assert result[0]["column1"] == "value1" + assert result[0]["column2"] == "value2" # Verify that the query method was called with the correct query string - mock_client_instance.query.assert_called_once_with('SELECT * FROM dataset.table') \ No newline at end of file + mock_client_instance.query.assert_called_once_with("SELECT * FROM dataset.table") diff --git a/samples/bigQueryTutorial.py b/samples/bigQueryTutorial.py new file mode 100644 index 0000000..6afd9ec --- /dev/null +++ b/samples/bigQueryTutorial.py @@ -0,0 +1,30 @@ +from local_data_platform.store.source.gcp.bigquery import BigQuery +from pathlib import Path +import os +from local_data_platform import Config +from local_data_platform.store.source.json import Json + + +""" +creadentials.json is the path to the credentials file that you downloaded from GCP. +""" +local_gcp_credential_path = Path( + os.getcwd() + "/local-data-platform/samples/creadentials.json" +) + +dataset = "near_transactions" + +config_path = Path( + os.getcwd() + + "/local-data-platform/real_world_use_cases/near_data_lake/config/ingestion.json" +) + +config = Config( + **Json( + name=dataset, + path=config_path, + ).get() +) + +store = BigQuery(credential_path=local_gcp_credential_path) +df = store.query(NEAR_TRANSACTION_QUERY.query)