Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Poe Initial Script's Issues #32

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions src/scripts/poe_initial.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import asyncio
from asyncio import Queue
from dataclasses import dataclass
import datetime as dt
from decimal import Decimal
import json
import os
import time
from typing import Any, cast

import beanie
import beanie.operators
from httpx import AsyncClient, RequestError
from loguru import logger
import motor
from pydantic import BaseModel, Field, computed_field
import pydantic
import pymongo

from src.config.services import connect_to_mongodb
from src.models import document_models
Expand Down Expand Up @@ -55,8 +60,6 @@ class Receive(BaseModel):
pay: Pay | None = None
receive: Receive | None = None
metadata: CurrencyItemMetadata | None = None
# paySparkLine: ItemSparkline | None = None
# receiveSparkLine: ItemSparkline | None = None
chaosEquivalent: Decimal = Decimal(0)


Expand Down Expand Up @@ -138,8 +141,8 @@ def low_confidence(self) -> bool:


API_BASE_URL = "https://poe.ninja/api/data"

BATCH_INSERT_LIMIT = 15_000
LEAGUE = "Settlers"


async def save_item_categories():
Expand All @@ -148,8 +151,10 @@ async def save_item_categories():

for group, categories in CATEGORY_GROUP_MAP.items():
for category in categories:
item_category = ItemCategory(name=category.name, internal_name=category.internal_name, group=group)
await ItemCategory.save(item_category)
await ItemCategory.find_one(ItemCategory.name == category.name).upsert(
beanie.operators.Set({ItemCategory.updated_time: dt.datetime.now(dt.UTC)}),
on_insert=ItemCategory(name=category.name, internal_name=category.internal_name, group=group),
) # type: ignore


def write_item_data_to_disk(group: str, category_name: str, data: dict[str, Any]):
Expand Down Expand Up @@ -212,9 +217,8 @@ async def get_item_api_data(internal_category_name: str, client: AsyncClient) ->
"""Gets data for all Items belonging to a category from the apt Poe Ninja API by preparing and calling the API
endpoint, then parsing and returning the item data for the category."""

api_endpoint = "currencyoverview" if internal_category_name == "Currency" else "itemoverview"
league = "Settlers"
url = f"/{api_endpoint}?league={league}&type={internal_category_name}"
api_endpoint = "currencyoverview" if internal_category_name in ["Currency", "Fragment"] else "itemoverview"
url = f"/{api_endpoint}?league={LEAGUE}&type={internal_category_name}"

item_data = []
currency_item_metadata = []
Expand All @@ -227,6 +231,9 @@ async def get_item_api_data(internal_category_name: str, client: AsyncClient) ->
else:
json_response = response.json()
item_data: list[dict] = json_response["lines"]
if len(item_data) < 2:
logger.error(f"no data found for '{internal_category_name}' with endpoint: '{api_endpoint}'")

currency_item_metadata: list[dict] = json_response.get("currencyDetails", [])

api_item_data = ApiItemData(item_data, currency_item_metadata)
Expand Down Expand Up @@ -328,6 +335,7 @@ def prepare_item_record(
listings=item_entity.listingCount,
low_confidence=item_entity.low_confidence,
)
# TODO: save baseType too
item_record = Item(
poe_ninja_id=item_entity.id_,
name=item_entity.name,
Expand Down Expand Up @@ -365,7 +373,7 @@ async def parse_api_item_data(
category_name = category_record.name
category_internal_name = category_record.internal_name

is_currency = category_internal_name == "Currency"
is_currency = category_internal_name in ["Currency", "Fragment"]
currency_item_metadata = api_item_data.currency_item_metadata

logger.debug(f"received item data for {category_name}, parsing into pydantic instances")
Expand Down Expand Up @@ -393,10 +401,38 @@ async def parse_api_item_data(


async def save_items(item_records: list[Item]) -> bool:
"""Saves a list of Item records to the database."""
"""Saves a list of Item records to the database. Uses `pymongo`'s `UpdateOne` method to apply bulk updates to
items, with the `upsert` flag to update or insert items if they aren't already present."""

item_collection: motor.motor_asyncio.AsyncIOMotorCollection = Item.get_motor_collection() # type: ignore
prepared_item_records = []

try:
await Item.insert_many(item_records)
for item in item_records:
assert item.price_info is not None
serialized_price_info = item.price_info.serialize_price_data()

prepared_item_records.append(
pymongo.UpdateOne(
{"poe_ninja_id": item.poe_ninja_id},
{
"$set": {
"poe_ninja_id": item.poe_ninja_id,
"name": item.name,
"type_": item.type_,
"price_info": serialized_price_info,
"variant": item.variant,
"icon_url": item.icon_url,
"links": item.links,
"updated_time": dt.datetime.now(dt.UTC),
},
},
upsert=True,
)
)

result = await item_collection.bulk_write(prepared_item_records)
logger.info(f"result from bulk saving item records: {result}")
except Exception as exc:
logger.error(f"error saving item records to DB: {exc}")
return False
Expand Down
Loading