Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix example #9

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added example/libs/__init__.py
Empty file.
Empty file.
46 changes: 46 additions & 0 deletions example/libs/collectors/collector_a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Collector A."""

from __future__ import annotations

from datetime import datetime

from .collector_base import CollectorBase
from .utils import df_from_raw_csv

COLLECTOR_A_DB = df_from_raw_csv(
"""
id,title,abstract,published_at
1,"article a 1","my abstract 1","2023-01-01"
2,"article a 2","my abstract 2","2023-02-01"
3,"article a 3","my abstract 3","2023-03-01"
4,"article a 4","my abstract 4","2023-04-01"
5,"article a 5","my abstract 5","2023-05-01"
6,"article a 6","my abstract 6","2023-06-01"
7,"article a 7","my abstract 7","2023-07-01"
8,"article a 8","my abstract 8","2023-08-01"
9,"article a 9","my abstract 9","2023-09-01"
10,"article a 10","my abstract 10","2023-10-01"
"""
)


class CollectorA(CollectorBase):
"""Collector A."""

def get_docs_ids(self, query: str) -> list[str]:
"""Get a list of document IDs."""
str_dt_start, str_dt_end = query.split(",")
dt_start = datetime.strptime(str_dt_start, "%Y-%m-%d")
dt_end = datetime.strptime(str_dt_end, "%Y-%m-%d")
data = DB[
(DB["published"] >= dt_start) and (DB["published"] <= dt_end)
]
return data["id"].to_list()

def get_docs_metadata(self, docs_ids: list[str]) -> list[dict[str, str]]:
"""Get a list of document metadata."""
return [self.get_single_doc_metadata(idx) for idx in docs_ids]

def get_single_doc_metadata(self, doc_id: str) -> dict[str, str]:
"""Get a document metadata."""
return DB[DB["id"] == doc_id].to_dict("records")[0]
46 changes: 46 additions & 0 deletions example/libs/collectors/collector_b.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Collector A."""

from __future__ import annotations

from datetime import datetime

from .collector_base import CollectorBase
from .utils import df_from_raw_csv

COLLECTOR_A_DB = df_from_raw_csv(
"""
id,title,abstract,published_at
1,"article B 1","my abstract 1","2023-01-01"
2,"article B 2","my abstract 2","2023-02-01"
3,"article B 3","my abstract 3","2023-03-01"
4,"article B 4","my abstract 4","2023-04-01"
5,"article B 5","my abstract 5","2023-05-01"
6,"article B 6","my abstract 6","2023-06-01"
7,"article B 7","my abstract 7","2023-07-01"
8,"article B 8","my abstract 8","2023-08-01"
9,"article B 9","my abstract 9","2023-09-01"
10,"article B 10","my abstract 10","2023-10-01"
"""
)


class CollectorA(CollectorBase):
"""Collector A."""

def get_docs_ids(self, query: str) -> list[str]:
"""Get a list of document IDs."""
str_dt_start, str_dt_end = query.split(",")
dt_start = datetime.strptime(str_dt_start, "%Y-%m-%d")
dt_end = datetime.strptime(str_dt_end, "%Y-%m-%d")
data = DB[
(DB["published"] >= dt_start) and (DB["published"] <= dt_end)
]
return data["id"].to_list()

def get_docs_metadata(self, docs_ids: list[str]) -> list[dict[str, str]]:
"""Get a list of document metadata."""
return [self.get_single_doc_metadata(idx) for idx in docs_ids]

def get_single_doc_metadata(self, doc_id: str) -> dict[str, str]:
"""Get a document metadata."""
return DB[DB["id"] == doc_id].to_dict("records")[0]
15 changes: 15 additions & 0 deletions example/libs/collectors/collector_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations


class CollectorBase:
def get_docs_ids(self, query: str) -> list[str]:
"""Get a list of document IDs."""
raise Exception("Not implemented yet")

def get_docs_metadata(self, doc_id: str) -> list[dict[str, str]]:
"""Get a list of document metadata."""
raise Exception("Not implemented yet")

def get_doc_metadata(self, doc_id: str) -> dict[str, str]:
"""Get a document metadata."""
raise Exception("Not implemented yet")
46 changes: 46 additions & 0 deletions example/libs/collectors/collector_c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Collector A."""

from __future__ import annotations

from datetime import datetime

from .collector_base import CollectorBase
from .utils import df_from_raw_csv

COLLECTOR_A_DB = df_from_raw_csv(
"""
id,title,abstract,published_at
1,"article C 1","my abstract 1","2023-01-01"
2,"article C 2","my abstract 2","2023-02-01"
3,"article C 3","my abstract 3","2023-03-01"
4,"article C 4","my abstract 4","2023-04-01"
5,"article C 5","my abstract 5","2023-05-01"
6,"article C 6","my abstract 6","2023-06-01"
7,"article C 7","my abstract 7","2023-07-01"
8,"article C 8","my abstract 8","2023-08-01"
9,"article C 9","my abstract 9","2023-09-01"
10,"article C 10","my abstract 10","2023-10-01"
"""
)


class CollectorA(CollectorBase):
"""Collector A."""

def get_docs_ids(self, query: str) -> list[str]:
"""Get a list of document IDs."""
str_dt_start, str_dt_end = query.split(",")
dt_start = datetime.strptime(str_dt_start, "%Y-%m-%d")
dt_end = datetime.strptime(str_dt_end, "%Y-%m-%d")
data = DB[
(DB["published"] >= dt_start) and (DB["published"] <= dt_end)
]
return data["id"].to_list()

def get_docs_metadata(self, docs_ids: list[str]) -> list[dict[str, str]]:
"""Get a list of document metadata."""
return [self.get_single_doc_metadata(idx) for idx in docs_ids]

def get_single_doc_metadata(self, doc_id: str) -> dict[str, str]:
"""Get a document metadata."""
return DB[DB["id"] == doc_id].to_dict("records")[0]
13 changes: 13 additions & 0 deletions example/libs/collectors/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Helper functions."""

from io import StringIO

import pandas as pd


def df_from_raw_csv(raw_data: str) -> pd.DataFrame:
"""Create a pandas dataframe from a raw csv string."""
iodata = StringIO(raw_data)
df = pd.read_csv(iodata)
df["published_at"] = pd.to_datetime(df["published_at"], format="%Y-%m-%d")
return df
Empty file.
66 changes: 66 additions & 0 deletions example/tasks/collectors/collector_a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu import ResultTask
from retsu.celery import SerialCeleryTask

from .config import app, redis_client


@app.task
def task_serial_a1(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_a1."""
sleep(a + b)
print("running task a1")
result = a + b
redis_client.set(f"result-{task_id}", result)
return result


@app.task
def task_serial_a2(task_id: str) -> int: # type: ignore
"""Define the task_a2."""
print("running task a2")
result = redis_client.get(f"result-{task_id}")
return result


@app.task
def task_serial_final(results, task_id: str) -> int: # type: ignore
"""Define the final_task."""
print("running final task")

result = redis_client.get(f"result-{task_id}")
final_result = f"Final result: {result}"
print(final_result)

task_result = ResultTask()

task_result.save(task_id=task_id, result=final_result)

return final_result


class MySerialTask1(SerialCeleryTask):
"""MySerialTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> tuple[list[celery.Signature], celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_serial_a1.s(a, b, task_id),
task_serial_a2.s(task_id),
],
task_serial_final.s(task_id),
)
66 changes: 66 additions & 0 deletions example/tasks/collectors/collector_b.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu import ResultTask
from retsu.celery import SerialCeleryTask

from .config import app, redis_client


@app.task
def task_serial_a1(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_a1."""
sleep(a + b)
print("running task a1")
result = a + b
redis_client.set(f"result-{task_id}", result)
return result


@app.task
def task_serial_a2(task_id: str) -> int: # type: ignore
"""Define the task_a2."""
print("running task a2")
result = redis_client.get(f"result-{task_id}")
return result


@app.task
def task_serial_final(results, task_id: str) -> int: # type: ignore
"""Define the final_task."""
print("running final task")

result = redis_client.get(f"result-{task_id}")
final_result = f"Final result: {result}"
print(final_result)

task_result = ResultTask()

task_result.save(task_id=task_id, result=final_result)

return final_result


class MySerialTask1(SerialCeleryTask):
"""MySerialTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> tuple[list[celery.Signature], celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_serial_a1.s(a, b, task_id),
task_serial_a2.s(task_id),
],
task_serial_final.s(task_id),
)
66 changes: 66 additions & 0 deletions example/tasks/collectors/collector_c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu import ResultTask
from retsu.celery import SerialCeleryTask

from .config import app, redis_client


@app.task
def task_serial_a1(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_a1."""
sleep(a + b)
print("running task a1")
result = a + b
redis_client.set(f"result-{task_id}", result)
return result


@app.task
def task_serial_a2(task_id: str) -> int: # type: ignore
"""Define the task_a2."""
print("running task a2")
result = redis_client.get(f"result-{task_id}")
return result


@app.task
def task_serial_final(results, task_id: str) -> int: # type: ignore
"""Define the final_task."""
print("running final task")

result = redis_client.get(f"result-{task_id}")
final_result = f"Final result: {result}"
print(final_result)

task_result = ResultTask()

task_result.save(task_id=task_id, result=final_result)

return final_result


class MySerialTask1(SerialCeleryTask):
"""MySerialTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> tuple[list[celery.Signature], celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_serial_a1.s(a, b, task_id),
task_serial_a2.s(task_id),
],
task_serial_final.s(task_id),
)
Loading
Loading