Skip to content

Commit

Permalink
Optimize Initial Path of Exile Script (#10)
Browse files Browse the repository at this point in the history
* refactor: refactor poe initial script's data fetching-parsing flow

- add API base url as constant
- add docstrings for save items and write to disk functions
- create function to get category by name
- add functions to get and parse/serialize item api data

* refactor: switch to producer-consumer implementation for poe initial script

- read parsed item data from queue and bulk insert into DB
- parse items and add them to list, pushing list to queue on certain size
- concurrently run producer-consumer code using task groups
  • Loading branch information
dhruv-ahuja authored May 19, 2024
1 parent b10e9e7 commit 4f1f360
Showing 1 changed file with 146 additions and 44 deletions.
190 changes: 146 additions & 44 deletions src/scripts/poe_initial.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import asyncio
from asyncio import Queue
from dataclasses import dataclass
import json
import os
import time
from typing import Any, cast

from httpx import AsyncClient
from httpx import AsyncClient, RequestError
from loguru import logger
from pydantic import BaseModel, Field
import pydantic
Expand Down Expand Up @@ -96,14 +97,24 @@ class ItemEntity(BaseModel):
}


async def save_update_categories():
API_BASE_URL = "https://poe.ninja/api/data"

BATCH_INSERT_LIMIT = 15_000


async def save_item_categories():
"""Iterates over the nested category group hashmap and Saves information for individual item categories into the
database."""

for group, categories in CATEGORY_GROUP_MAP.items():
for category in categories:
item_category = ItemCategory(name=category.name, internal_name=category.internal_name, group=group)
await ItemCategory.save(item_category)


def write_item_data_to_disk(group: str, category_name: str, data: dict[str, Any]):
"""Writes item API data to disk, using the group and category names to define the JSON file path."""

base_path = f"itemData/{group}"
file_path = f"{base_path}/{category_name}.json"

Expand All @@ -117,61 +128,103 @@ def write_item_data_to_disk(group: str, category_name: str, data: dict[str, Any]
json.dump(data, f, indent=4)


async def get_item_data() -> dict[str, list[dict]]:
category_item_mapping = {}

async with AsyncClient(base_url="https://poe.ninja/api/data") as client:
async def prepare_api_data(
api_item_data_queue: Queue[tuple[ItemCategory, list[dict]] | None], item_data_queue: Queue[list[Item] | None]
):
start = time.perf_counter()
async with AsyncClient(base_url=API_BASE_URL) as client:
for _, categories in CATEGORY_GROUP_MAP.items():
for category in categories:
internal_category_name = category.internal_name
category_name = category.name

logger.debug(f"fetching data for {category_name}")
category_record = await get_category_by_name(category_name)
if category_record is None:
logger.error(f"DB record for {category.name} wasn't found, skipping process!")
continue

api_endpoint = "currencyoverview" if internal_category_name == "Currency" else "itemoverview"
url = f"/{api_endpoint}?league=Necropolis&type={category.internal_name}"
res = await client.get(url)
logger.debug(f"category: {internal_category_name}, status_code: {res.status_code}")
logger.debug(f"getting api data for {category_name}")
api_item_data = await get_item_api_data(internal_category_name, client)

# TODO: parse and pass currency icons separately
item_data: list[dict] = res.json()["lines"]
category_item_mapping[category_name] = item_data
api_item_data_payload = (category_record, api_item_data)
await api_item_data_queue.put(api_item_data_payload)

time.sleep(0.1)
logger.debug(f"pushed api data for {category_name}")

return category_item_mapping
# push sentinel value to indicate end of production
await api_item_data_queue.put(None)

stop = time.perf_counter()
logger.info(f"total execution time for preparing api data: {stop - start}")

# TODO: create asyncio tasks and divide categories' data into separate threads for concurrent insertions;
# TODO: divide Base Types into several tasks due to large size (~20k)
async def save_item_data(category_item_mapping: dict[str, list[dict]]) -> None:
logger.debug("starting to save item data in DB")
items = []

for category, item_data in category_item_mapping.items():
start = time.perf_counter()
logger.debug(f"serializing data for {category}")
async def get_category_by_name(name: str) -> ItemCategory | None:
"""Gets an `ItemCategory` document from the database by its name."""

category_record = await ItemCategory.find(ItemCategory.name == category).first_or_none()
if category_record is None:
logger.error(f"DB record for {category} wasn't found!")
continue
try:
category_record = await ItemCategory.find(ItemCategory.name == name).first_or_none()
except Exception as exc:
logger.error(f"error getting category by name '{name}': {exc} ")
return

is_currency = category == "Currency"
return category_record

for entity in item_data:
try:
if is_currency:
item_entity = CurrencyItemEntity(**entity)
else:
item_entity = ItemEntity(**entity)

async def get_item_api_data(internal_category_name: str, client: AsyncClient) -> list[dict[str, Any]]:
"""Gets data for all Items belonging to a category from the apt Poe Ninja API by preparing and calling the API
endpoint, then parsing and returning the item data for the category."""

api_endpoint = "currencyoverview" if internal_category_name == "Currency" else "itemoverview"
url = f"/{api_endpoint}?league=Necropolis&type={internal_category_name}"

try:
response = await client.get(url)
logger.debug(f"category: {internal_category_name}, status_code: {response.status_code}")
except RequestError as exc:
logger.error(f"error fetching data for '{internal_category_name}' with endpoint '{api_endpoint}': {exc}")
item_data = []
else:
# TODO: parse and pass currency icons separately
item_data: list[dict] = response.json()["lines"]

return item_data


async def parse_api_item_data(
api_item_data_queue: Queue[tuple[ItemCategory, list[dict]] | None], item_data_queue: Queue[list[Item] | None]
) -> None:
"""Fetches item API data from the respective queue, and parses it into apt Pydantic model instances, hence
structuring each item in the list and validating its values.
Pushes the structured data records to the item data queue once done."""

item_records: list[Item] = []

start = time.perf_counter()
while True:
api_item_data_payload = await api_item_data_queue.get()

if api_item_data_payload is None:
# no more data to process, push current item data, sentinel value and exit
await item_data_queue.put(item_records)
await item_data_queue.put(None)
break

category_record, api_item_data = api_item_data_payload
category_name = category_record.name
category_internal_name = category_record.internal_name

is_currency = category_internal_name == "Currency"
logger.debug(f"received item data for {category_name}, parsing into pydantic instances")

for api_item_entity in api_item_data:
try:
item_entity = CurrencyItemEntity(**api_item_entity) if is_currency else ItemEntity(**api_item_entity)
except pydantic.ValidationError as exc:
if is_currency:
name = entity["currencyTypeName"]
name = api_item_entity.get("currencyTypeName")
item_type = "currency"
else:
name = entity["name"]
name = api_item_entity.get("name")
item_type = "item"

logger.error(f"error parsing '{name}' {item_type} entity data: {exc}")
Expand Down Expand Up @@ -210,20 +263,69 @@ async def save_item_data(category_item_mapping: dict[str, list[dict]]) -> None:
variant=item_entity.variant,
)

items.append(item_record)
item_records.append(item_record)

# push this batch of item records and reset the list container
if len(item_records) >= BATCH_INSERT_LIMIT:
await item_data_queue.put(item_records)
item_records = []

logger.debug(f"parsed all entities for {category_name}")

await Item.insert_many(items)
stop = time.perf_counter()
execution_time = stop - start
logger.info(f"total execution time for parsing api data: {stop - start}")


async def save_items(item_records: list[Item]) -> bool:
"""Saves a list of Item records to the database."""

try:
await Item.insert_many(item_records)
except Exception as exc:
logger.error(f"error saving item records to DB: {exc}")
return False

return True

logger.debug(f"time taken for category: {category}: {execution_time}")

async def save_item_data(item_data_queue: Queue[list[Item] | None]) -> None:
"""Gets item records in bulk from the item data queue and saves them to the database."""

batch_save_count = 1
logger.debug("initiating saving items to DB")

start = time.perf_counter()
while True:
item_records = await item_data_queue.get()
if item_records is None:
logger.debug("consumer received exit signal")
break

logger.debug(f"saving batch {batch_save_count} of items to DB")
save_was_success = await save_items(item_records)

if save_was_success:
logger.debug(f"successfully saved batch {batch_save_count} of items to DB")
else:
logger.debug(f"failed to save batch {batch_save_count} of items to DB")

batch_save_count += 1

stop = time.perf_counter()
logger.info(f"time taken to save all records to DB: {stop - start}")


async def main():
await connect_to_mongodb(document_models)
await save_update_categories()
category_items_mapping = await get_item_data()
await save_item_data(category_items_mapping)
await save_item_categories()

api_item_data_queue: Queue[tuple[ItemCategory, list[dict]] | None] = Queue()
item_data_queue: Queue[list[Item] | None] = Queue()

async with asyncio.TaskGroup() as tg:
tg.create_task(prepare_api_data(api_item_data_queue, item_data_queue))
tg.create_task(parse_api_item_data(api_item_data_queue, item_data_queue))
tg.create_task(save_item_data(item_data_queue))


if __name__ == "__main__":
Expand Down

0 comments on commit 4f1f360

Please sign in to comment.