Skip to content

Commit

Permalink
Connect all configured defi_llama protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 committed Jan 17, 2025
1 parent 3180a77 commit b0c8d5d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import orjson
import pandas as pd
from metrics_mesh.macros.oso_source import oso_source_for_pymodel
from oso_dagster.assets.defillama import DEFI_LLAMA_PROTOCOLS, defi_llama_slug_to_name
from sqlglot import exp
from sqlmesh import ExecutionContext, model

Expand All @@ -27,21 +28,23 @@ def parse_chain_tvl(protocol: str, chain_tvls_raw: str):
return series


def defillama_tvl_model(protocol: str):
def defi_llama_tvl_model(protocol: str):
@model(
name=f"metrics.stg__{protocol}_tvl_events",
is_sql=False,
columns={
"time": "INT64",
"slug": "VARCHAR",
"protocol": "VARCHAR",
"chain": "VARCHAR",
"token": "VARCHAR",
"tvl": "FLOAT",
},
)
def tvl_model(context: ExecutionContext, *args, **kwargs) -> pd.DataFrame:
source_name = defi_llama_slug_to_name(protocol)
# Run the query for the given protocol
table = oso_source_for_pymodel(context, f"bigquery.defillama_tvl.{protocol}")
table = oso_source_for_pymodel(context, f"bigquery.defillama_tvl.{source_name}")
df = context.fetchdf(
exp.select("chain_tvls")
.from_(table)
Expand All @@ -56,11 +59,12 @@ def tvl_model(context: ExecutionContext, *args, **kwargs) -> pd.DataFrame:
],
columns=["time", "protocol", "chain", "token", "tvl"], # type: ignore
)
result["slug"] = protocol
return result


def defillama_tvl_factory(protocols: t.List[str]):
return [defillama_tvl_model(protocol) for protocol in protocols]
def defi_llama_tvl_factory(protocols: t.List[str]):
return [defi_llama_tvl_model(protocol) for protocol in protocols]


defillama_tvl_factory(["contango_protocol"])
defi_llama_tvl_factory(DEFI_LLAMA_PROTOCOLS)
58 changes: 43 additions & 15 deletions warehouse/metrics_tools/local/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import duckdb
import pyarrow as pa
from google.cloud import bigquery
from oso_dagster.assets.defillama import DEFI_LLAMA_PROTOCOLS, defi_llama_slug_to_name
from sqlglot import exp
from sqlmesh.core.dialect import parse_one

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
#logger.addHandler(logging.StreamHandler(sys.stdout))
# logger.addHandler(logging.StreamHandler(sys.stdout))

project_id = os.getenv("GOOGLE_PROJECT_ID")

Expand Down Expand Up @@ -89,6 +90,22 @@ def bq_to_duckdb(table_mapping: t.Dict[str, str], duckdb_path: str):
created_schemas = set()

for bq_table, duckdb_table in table_mapping.items():
logger.info(f"checking if {duckdb_table} already exists")

duckdb_table_exp = exp.to_table(duckdb_table)

response = conn.query(
f"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = '{duckdb_table_exp.db}'
AND table_name = '{duckdb_table_exp.this}'
"""
)
if len(response.fetchall()) > 0:
logger.info(f"{duckdb_table} already exists, skipping")
continue

logger.info(f"{bq_table}: copying to {duckdb_table}")
table = bigquery.TableReference.from_string(bq_table)
rows = bqclient.list_rows(table)
Expand All @@ -114,11 +131,12 @@ def bq_to_duckdb(table_mapping: t.Dict[str, str], duckdb_path: str):

duckdb_table_split = duckdb_table.split(".")
schema = duckdb_table_split[0]

if schema not in created_schemas:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
created_schemas.add(schema)
if is_simple_copy:

if is_simple_copy:
conn.execute(
f"CREATE TABLE IF NOT EXISTS {duckdb_table} AS SELECT * FROM table_as_arrow"
)
Expand Down Expand Up @@ -163,20 +181,30 @@ def bq_to_duckdb(table_mapping: t.Dict[str, str], duckdb_path: str):


def initialize_local_duckdb(path: str):
# Use the oso_dagster assets as the source of truth for configured defi
# llama protocols for now
defi_llama_tables = {
f"opensource-observer.defillama_tvl.{defi_llama_slug_to_name(slug)}": f"sources_defillama_tvl.{defi_llama_slug_to_name(slug)}"
for slug in DEFI_LLAMA_PROTOCOLS
}

table_mapping = {
# We need to rename this once we run the oso_playground dbt again
"opensource-observer.oso_playground.artifacts_by_project_v1": "sources.artifacts_by_project_v1",
"opensource-observer.oso_playground.int_artifacts_in_ossd_by_project": "sources.int_artifacts_in_ossd_by_project",
"opensource-observer.oso_playground.int_superchain_potential_bots": "sources.int_superchain_potential_bots",
"opensource-observer.oso_playground.package_owners_v0": "sources.package_owners_v0",
"opensource-observer.oso_playground.projects_by_collection_v1": "sources.projects_by_collection_v1",
"opensource-observer.oso_playground.projects_v1": "sources.projects_v1",
"opensource-observer.oso_playground.sboms_v0": "sources.sboms_v0",
"opensource-observer.oso_playground.timeseries_events_by_artifact_v0": "sources.timeseries_events_by_artifact_v0",
"opensource-observer.oso_playground.timeseries_events_aux_issues_by_artifact_v0": "sources.timeseries_events_aux_issues_by_artifact_v0",
}

table_mapping.update(defi_llama_tables)

bq_to_duckdb(
{
# We need to rename this once we run the oso_playground dbt again
"opensource-observer.oso_playground.artifacts_by_project_v1": "sources.artifacts_by_project_v1",
"opensource-observer.oso_playground.int_artifacts_in_ossd_by_project": "sources.int_artifacts_in_ossd_by_project",
"opensource-observer.oso_playground.int_superchain_potential_bots": "sources.int_superchain_potential_bots",
"opensource-observer.oso_playground.package_owners_v0": "sources.package_owners_v0",
"opensource-observer.oso_playground.projects_by_collection_v1": "sources.projects_by_collection_v1",
"opensource-observer.oso_playground.projects_v1": "sources.projects_v1",
"opensource-observer.oso_playground.sboms_v0": "sources.sboms_v0",
"opensource-observer.oso_playground.timeseries_events_by_artifact_v0": "sources.timeseries_events_by_artifact_v0",
"opensource-observer.oso_playground.timeseries_events_aux_issues_by_artifact_v0": "sources.timeseries_events_aux_issues_by_artifact_v0",
"opensource-observer.defillama_tvl.contango_protocol": "sources_defillama_tvl.contango_protocol",
},
table_mapping,
path,
)

Expand Down
8 changes: 7 additions & 1 deletion warehouse/oso_dagster/assets/defillama.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

from ..factories.rest import create_rest_factory_asset


def defi_llama_slug_to_name(slug: str) -> str:
return f"{slug.replace('-', '_').replace(".", '__dot__')}_protocol"


DEFI_LLAMA_PROTOCOLS = [
"aave-v1",
"aave-v2",
Expand Down Expand Up @@ -32,7 +37,7 @@
"resources": list(
map(
lambda protocol: {
"name": f"{protocol.replace('-', '_').replace(".", '__dot__')}_protocol",
"name": defi_llama_slug_to_name(protocol),
"endpoint": {
"path": f"protocol/{protocol}",
"data_selector": "$",
Expand All @@ -43,6 +48,7 @@
),
}


dlt_assets = create_rest_factory_asset(
config=config,
)
Expand Down

0 comments on commit b0c8d5d

Please sign in to comment.