Skip to content

Commit

Permalink
feat(api): creating the data_models app to manage the data models use… (
Browse files Browse the repository at this point in the history
#564)

* feat(api): creating the data_models app to manage the data models used for caching of the data-models API

* feat(api): have reworked 'scorer_dump_data_parquet' and added new command 'scorer_dump_data_eth_model_score'. Have also updated docker-compose

* feat(api): adding test for 'scorer_dump_data_eth_model_score'

* fix(api): fix broken commend 'scorer_dump_data_parquet'

* feat(api): infra changes, fixes, also tagging  as not managed by django

* fix(api): fix test 'test_cmd_scorer_dump_data_eth_model_score', and move it to 'data_model' app

---------

Co-authored-by: Gerald Iakobinyi-Pich <gerald@gitcoin.co>
  • Loading branch information
nutrina and Gerald Iakobinyi-Pich authored Apr 12, 2024
1 parent e297172 commit 54dd4fd
Show file tree
Hide file tree
Showing 20 changed files with 677 additions and 335 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ api/*.sqlite3
**/node_modules/**

postgres_db_data
postgres_db_passport_data

.DS_Store
4 changes: 3 additions & 1 deletion api/.env-sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ LOG_SQL_QUERIES=False
SECRET_KEY=this_should_be_a_super_secret_key
DATABASE_URL=sqlite:///db.sqlite3
READ_REPLICA_0_URL=sqlite:///db.sqlite3
DATABASE_URL_FOR_DOCKER=postgres://passport_scorer:passport_scorer_pwd@postgres:5432/passport_scorer
# Database URL for the data model app - this is used by the data model APIs developed by the data team
DATA_MODEL_DATABASE_URL=sqlite:///db_data_model.sqlite3
# DATABASE_URL_FOR_DOCKER=postgres://passport_scorer:passport_scorer_pwd@postgres:5432/passport_scorer
ALLOWED_HOSTS='[]'
TEST_MNEMONIC=test val is here ...
UI_DOMAINS=[localhost:3000, www.localhost:3000]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import json
import traceback
from urllib.parse import urlparse


from django.core.management.base import BaseCommand
from scorer.export_utils import (
export_data_for_model,
upload_to_s3,
)
from data_model.models import Cache
from contextlib import contextmanager
from django.core.serializers.json import DjangoJSONEncoder
from logging import getLogger

log = getLogger(__name__)


def get_writer(output_file):
@contextmanager
def eth_stamp_writer_context_manager(queryset):
try:
with open(output_file, "w", encoding="utf-8") as file:

class WriterWrappe:
def __init__(self, file):
self.file = file

def write_batch(self, data):
for d in data:
try:
key = json.loads(d["key"])
value = d["value"]
address = key[1].lower()
self.file.write(
json.dumps(
{
"address": address,
"data": {
"score": str(
value["data"]["human_probability"]
)
},
"updated_at": d["updated_at"],
},
cls=DjangoJSONEncoder,
)
+ "\n"
)
except Exception:
log.error(
f"Error when writing record '{d}'", exc_info=True
)

yield WriterWrappe(file)
finally:
pass

return eth_stamp_writer_context_manager


class Command(BaseCommand):
help = "Export eth-model score to jsonl"

def add_arguments(self, parser):
parser.add_argument(
"--batch-size",
type=int,
default=1000,
help="""Size of record batches.
If present, this will read the records in batches. The result list is ordered by pk (id), to get
to the next batch we query by id__gt=last_id.
""",
)
parser.add_argument(
"--s3-uri", type=str, help="The S3 URI target location for the files"
)

parser.add_argument("--filename", type=str, help="The output filename")

parser.add_argument(
"--s3-extra-args",
type=str,
help="""JSON object, that contains extra args for the files uploaded to S3.
This will be passed in as the `ExtraArgs` parameter to boto3's upload_file method.""",
)

def handle(self, *args, **options):
batch_size = options["batch_size"]
s3_uri = options["s3_uri"]
filename = options["filename"]

extra_args = (
json.loads(options["s3_extra_args"]) if options["s3_extra_args"] else None
)

self.stdout.write(f"EXPORT - s3_uri : '{s3_uri}'")
self.stdout.write(f"EXPORT - batch_size : '{batch_size}'")
self.stdout.write(f"EXPORT - filename : '{filename}'")

parsed_uri = urlparse(s3_uri)
s3_bucket_name = parsed_uri.netloc
s3_folder = parsed_uri.path.strip("/")

try:
export_data_for_model(
Cache.objects.all(),
"key",
batch_size,
get_writer(filename),
jsonfields_as_str=False,
)

self.stdout.write(
self.style.SUCCESS(f"EXPORT - Data exported to '{filename}'")
)

upload_to_s3(filename, s3_folder, s3_bucket_name, extra_args)

self.stdout.write(
self.style.SUCCESS(
f"EXPORT - Data uploaded to '{s3_bucket_name}/{s3_folder}/{filename}'"
)
)

except Exception as e:
self.stdout.write(
self.style.ERROR(f"EXPORT - Error when exporting data '{e}'")
)
self.stdout.write(self.style.ERROR(traceback.format_exc()))
165 changes: 31 additions & 134 deletions api/ceramic_cache/management/commands/scorer_dump_data_parquet.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,15 @@
import json
import traceback
from pprint import pprint
from urllib.parse import urlparse

import boto3
import pyarrow as pa
import pyarrow.parquet as pq
from ceramic_cache.models import CeramicCache
from django.apps import apps
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import DEFAULT_DB_ALIAS, connection
from registry.models import Passport
from tqdm import tqdm

# The following mapping will map django field types to pyarrow types
pa_schema_map = {
"AutoField": {"pa_type": pa.int64()},
"BigAutoField": {"pa_type": pa.int64()},
# "ForeignKey": {"pa_type": pa.int64()},
# "OneToOneField": {"pa_type": pa.int64()},
"CharField": {"pa_type": pa.string()},
"JSONField": {"pa_type": pa.string(), "map_value": str},
"DateTimeField": {"pa_type": pa.timestamp("ms")},
"IntegerField": {"pa_type": pa.int64()},
"BooleanField": {"pa_type": pa.bool_()},
"DecimalField": {"pa_type": pa.decimal256(18, 9)},
}
from django.db import DEFAULT_DB_ALIAS
from scorer.export_utils import (
export_data_for_model,
writer_context_manager,
upload_to_s3,
)


class Command(BaseCommand):
Expand Down Expand Up @@ -62,112 +45,18 @@ def add_arguments(self, parser):
help="""JSON object, that contains extra args for the files uploaded to S3.
This will be passed in as the `ExtraArgs` parameter to boto3's upload_file method.""",
)

def get_pa_schema(self, model):
schema = pa.schema(
[
# We need to take into consideration that for relation fields, we actually need the `fieldname + "_id"`
(field.name, self.map_to_pa_schema_field(field.get_internal_type()))
if not field.is_relation
else (
f"{field.name}_id",
self.map_to_pa_schema_field(field.target_field.get_internal_type()),
)
for field in model._meta.fields
]
)
return schema

def map_to_pa_schema_field(self, django_internal_field_type):
if django_internal_field_type in pa_schema_map:
return pa_schema_map[django_internal_field_type]["pa_type"]

self.stdout.write(
self.style.WARNING(
f"Unmapped django field type '{django_internal_field_type}'. Will default to `string`"
)
parser.add_argument(
"--sort-field",
type=str,
help="""The field used to sort and batch the export. This is typically the id, but can be any unique field.""",
default="id",
)
return pa.string()

def get_data(self, model, last_id):
q = model.objects.using(self.database).all().order_by("id")

if last_id:
q = q.filter(id__gt=last_id)

data = q[: self.batch_size].values()

# In case of JSONField, the value will be a dict, which is not possible to serialize to Parquet
# This is why we serialize that to JSON
data = [
{k: v if type(v) is not dict else json.dumps(v) for k, v in d.items()}
for d in data
]

return (self.get_pa_schema(model), data)

def export_data_for_model(self, model, s3_folder, s3_bucket_name, extra_args):
schema, data = self.get_data(model, None)
# Define the output Parquet file
table_name = model._meta.db_table
output_file = f"{table_name}.parquet"

# Export data for the model
with tqdm(
unit="records",
unit_scale=True,
desc=f"Exporting records of {table_name}",
) as progress_bar:
if data:
progress_bar.update(len(data))
with pq.ParquetWriter(output_file, schema) as writer:
batch = pa.RecordBatch.from_pylist(data, schema=schema)
writer.write_batch(batch)

has_more = True
last_id = data[-1]["id"]

while has_more:
_, data = self.get_data(model, last_id)

if data:
progress_bar.update(len(data))
has_more = True
last_id = data[-1]["id"]

# Write the Pandas DataFrame to a Parquet file
batch = pa.RecordBatch.from_pylist(data, schema=schema)
writer.write_batch(batch)
else:
has_more = False

self.stdout.write(
self.style.SUCCESS(f"EXPORT - Data exported to '{output_file}'")
)

s3_key = f"{s3_folder}/{output_file}"

# Upload to S3 bucket
self.s3.upload_file(
output_file,
s3_bucket_name,
s3_key,
ExtraArgs=extra_args,
)

self.stdout.write(
self.style.SUCCESS(
f"EXPORT - Data uploaded to '{s3_bucket_name}/{s3_key}'"
)
)

def handle(self, *args, **options):
last_id = 0
has_more = True

self.batch_size = options["batch_size"]
self.s3_uri = options["s3_uri"]
self.database = options["database"]
self.sort_field = options["sort_field"]
apps_to_export = options["apps"].split(",") if options["apps"] else None
extra_args = (
json.parse(options["s3_extra_args"]) if options["s3_extra_args"] else None
Expand All @@ -185,12 +74,6 @@ def handle(self, *args, **options):
s3_bucket_name = parsed_uri.netloc
s3_folder = parsed_uri.path.strip("/")

self.s3 = boto3.client(
"s3",
aws_access_key_id=settings.S3_DATA_AWS_SECRET_KEY_ID,
aws_secret_access_key=settings.S3_DATA_AWS_SECRET_ACCESS_KEY,
)

for app_name in apps_to_export:
self.stdout.write(f"EXPORT - START export data for app: '{app_name}'")
# Get the app's configuration
Expand All @@ -201,11 +84,25 @@ def handle(self, *args, **options):
f"EXPORT - START export data for model: '{app_name}.{model._meta.model_name}'"
)
try:
self.export_data_for_model(
model,
s3_folder,
s3_bucket_name,
extra_args,
table_name = model._meta.db_table
output_file = f"{table_name}.parquet"
export_data_for_model(
model.objects.all().using(self.database),
self.sort_field,
self.batch_size,
writer_context_manager,
)

self.stdout.write(
self.style.SUCCESS(f"EXPORT - Data exported to '{output_file}'")
)

upload_to_s3(output_file, s3_folder, s3_bucket_name, extra_args)

self.stdout.write(
self.style.SUCCESS(
f"EXPORT - Data uploaded to '{s3_bucket_name}/{s3_folder}/{output_file}'"
)
)

except Exception as e:
Expand Down
Empty file added api/data_model/__init__.py
Empty file.
21 changes: 21 additions & 0 deletions api/data_model/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.contrib import admin

from scorer.scorer_admin import ScorerModelAdmin
from data_model.models import Cache


@admin.register(Cache)
class CacheAdmin(ScorerModelAdmin):
list_display = [
"key",
"value",
"updated_at",
]

list_filter = []

search_fields = [
"key",
"value",
]
search_help_text = "Search by: " + ", ".join(search_fields)
6 changes: 6 additions & 0 deletions api/data_model/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class DataModelConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "data_model"
Loading

0 comments on commit 54dd4fd

Please sign in to comment.