Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect all configured defi_llama protocols #2797

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import orjson
import pandas as pd
from metrics_mesh.macros.oso_source import oso_source_for_pymodel
from oso_dagster.assets.defillama import DEFI_LLAMA_PROTOCOLS, defi_llama_slug_to_name
from sqlglot import exp
from sqlmesh import ExecutionContext, model

Expand All @@ -27,26 +28,33 @@ def parse_chain_tvl(protocol: str, chain_tvls_raw: str):
return series


def defillama_tvl_model(protocol: str):
def defi_llama_tvl_model(protocol: str):
@model(
name=f"metrics.stg__{protocol}_tvl_events",
name=f"metrics.stg__{defi_llama_slug_to_name(protocol)}_tvl_events",
is_sql=False,
columns={
"time": "INT64",
"slug": "VARCHAR",
"protocol": "VARCHAR",
"chain": "VARCHAR",
"token": "VARCHAR",
"tvl": "FLOAT",
},
)
def tvl_model(context: ExecutionContext, *args, **kwargs) -> pd.DataFrame:
def tvl_model(
context: ExecutionContext, *args, **kwargs
) -> t.Iterator[pd.DataFrame]:
source_name = defi_llama_slug_to_name(protocol)
# Run the query for the given protocol
table = oso_source_for_pymodel(context, f"bigquery.defillama_tvl.{protocol}")
table = oso_source_for_pymodel(context, f"bigquery.defillama_tvl.{source_name}")
df = context.fetchdf(
exp.select("chain_tvls")
.from_(table)
.sql(dialect=context.engine_adapter.dialect)
)
if df.empty:
yield from ()
return
# Parse the chain tvls
result = pd.DataFrame(
[
Expand All @@ -56,11 +64,15 @@ def tvl_model(context: ExecutionContext, *args, **kwargs) -> pd.DataFrame:
],
columns=["time", "protocol", "chain", "token", "tvl"], # type: ignore
)
return result
if result.empty:
yield from ()
return
result["slug"] = protocol
yield result


def defillama_tvl_factory(protocols: t.List[str]):
return [defillama_tvl_model(protocol) for protocol in protocols]
def defi_llama_tvl_factory(protocols: t.List[str]):
return [defi_llama_tvl_model(protocol) for protocol in protocols]


defillama_tvl_factory(["contango_protocol"])
defi_llama_tvl_factory(DEFI_LLAMA_PROTOCOLS)
58 changes: 43 additions & 15 deletions warehouse/metrics_tools/local/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import duckdb
import pyarrow as pa
from google.cloud import bigquery
from oso_dagster.assets.defillama import DEFI_LLAMA_PROTOCOLS, defi_llama_slug_to_name
from sqlglot import exp
from sqlmesh.core.dialect import parse_one

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
#logger.addHandler(logging.StreamHandler(sys.stdout))
# logger.addHandler(logging.StreamHandler(sys.stdout))

project_id = os.getenv("GOOGLE_PROJECT_ID")

Expand Down Expand Up @@ -89,6 +90,22 @@ def bq_to_duckdb(table_mapping: t.Dict[str, str], duckdb_path: str):
created_schemas = set()

for bq_table, duckdb_table in table_mapping.items():
logger.info(f"checking if {duckdb_table} already exists")

duckdb_table_exp = exp.to_table(duckdb_table)

response = conn.query(
f"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = '{duckdb_table_exp.db}'
AND table_name = '{duckdb_table_exp.this}'
"""
)
if len(response.fetchall()) > 0:
logger.info(f"{duckdb_table} already exists, skipping")
continue

logger.info(f"{bq_table}: copying to {duckdb_table}")
table = bigquery.TableReference.from_string(bq_table)
rows = bqclient.list_rows(table)
Expand All @@ -114,11 +131,12 @@ def bq_to_duckdb(table_mapping: t.Dict[str, str], duckdb_path: str):

duckdb_table_split = duckdb_table.split(".")
schema = duckdb_table_split[0]

if schema not in created_schemas:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
created_schemas.add(schema)
if is_simple_copy:

if is_simple_copy:
conn.execute(
f"CREATE TABLE IF NOT EXISTS {duckdb_table} AS SELECT * FROM table_as_arrow"
)
Expand Down Expand Up @@ -163,20 +181,30 @@ def bq_to_duckdb(table_mapping: t.Dict[str, str], duckdb_path: str):


def initialize_local_duckdb(path: str):
# Use the oso_dagster assets as the source of truth for configured defi
# llama protocols for now
defi_llama_tables = {
f"opensource-observer.defillama_tvl.{defi_llama_slug_to_name(slug)}": f"sources_defillama_tvl.{defi_llama_slug_to_name(slug)}"
for slug in DEFI_LLAMA_PROTOCOLS
}

table_mapping = {
# We need to rename this once we run the oso_playground dbt again
ravenac95 marked this conversation as resolved.
Show resolved Hide resolved
"opensource-observer.oso_playground.artifacts_by_project_v1": "sources.artifacts_by_project_v1",
"opensource-observer.oso_playground.int_artifacts_in_ossd_by_project": "sources.int_artifacts_in_ossd_by_project",
"opensource-observer.oso_playground.int_superchain_potential_bots": "sources.int_superchain_potential_bots",
"opensource-observer.oso_playground.package_owners_v0": "sources.package_owners_v0",
"opensource-observer.oso_playground.projects_by_collection_v1": "sources.projects_by_collection_v1",
"opensource-observer.oso_playground.projects_v1": "sources.projects_v1",
"opensource-observer.oso_playground.sboms_v0": "sources.sboms_v0",
"opensource-observer.oso_playground.timeseries_events_by_artifact_v0": "sources.timeseries_events_by_artifact_v0",
"opensource-observer.oso_playground.timeseries_events_aux_issues_by_artifact_v0": "sources.timeseries_events_aux_issues_by_artifact_v0",
}

table_mapping.update(defi_llama_tables)

bq_to_duckdb(
{
# We need to rename this once we run the oso_playground dbt again
"opensource-observer.oso_playground.artifacts_by_project_v1": "sources.artifacts_by_project_v1",
"opensource-observer.oso_playground.int_artifacts_in_ossd_by_project": "sources.int_artifacts_in_ossd_by_project",
"opensource-observer.oso_playground.int_superchain_potential_bots": "sources.int_superchain_potential_bots",
"opensource-observer.oso_playground.package_owners_v0": "sources.package_owners_v0",
"opensource-observer.oso_playground.projects_by_collection_v1": "sources.projects_by_collection_v1",
"opensource-observer.oso_playground.projects_v1": "sources.projects_v1",
"opensource-observer.oso_playground.sboms_v0": "sources.sboms_v0",
"opensource-observer.oso_playground.timeseries_events_by_artifact_v0": "sources.timeseries_events_by_artifact_v0",
"opensource-observer.oso_playground.timeseries_events_aux_issues_by_artifact_v0": "sources.timeseries_events_aux_issues_by_artifact_v0",
"opensource-observer.defillama_tvl.contango_protocol": "sources_defillama_tvl.contango_protocol",
},
table_mapping,
path,
)

Expand Down
8 changes: 7 additions & 1 deletion warehouse/oso_dagster/assets/defillama.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

from ..factories.rest import create_rest_factory_asset


def defi_llama_slug_to_name(slug: str) -> str:
return f"{slug.replace('-', '_').replace(".", '__dot__')}_protocol"


DEFI_LLAMA_PROTOCOLS = [
"aave-v1",
"aave-v2",
Expand Down Expand Up @@ -32,7 +37,7 @@
"resources": list(
map(
lambda protocol: {
"name": f"{protocol.replace('-', '_').replace(".", '__dot__')}_protocol",
"name": defi_llama_slug_to_name(protocol),
"endpoint": {
"path": f"protocol/{protocol}",
"data_selector": "$",
Expand All @@ -43,6 +48,7 @@
),
}


dlt_assets = create_rest_factory_asset(
config=config,
)
Expand Down
Loading