diff --git a/openfoodfacts/redis.py b/openfoodfacts/redis.py index 4140dbc..185c4e4 100644 --- a/openfoodfacts/redis.py +++ b/openfoodfacts/redis.py @@ -1,4 +1,5 @@ import datetime +from abc import ABC, abstractmethod from typing import Any, Iterator, Optional, Union, cast from pydantic import BaseModel, Json @@ -180,3 +181,81 @@ def get_new_updates_multistream( product_type=item["product_type"], diffs=item.get("diffs"), ) + + +class UpdateListener(ABC): + """A class representing a daemon that listens to updates from a Redis + stream and processes them. + + The class is meant to be subclassed to implement the processing logic. + Subclasses must implement the `process_redis_update` method. + """ + + def __init__( + self, redis_client: Redis, redis_stream_name: str, redis_latest_id_key: str + ): + self.redis_client = redis_client + self.redis_stream_name = redis_stream_name + self.redis_latest_id_key = redis_latest_id_key + + def run(self): + """Run the update import daemon. + + This daemon listens to the Redis stream containing information about + product updates, and triggers + """ + logger.info("Starting update listener daemon") + + logger.info("Redis client: %s", self.redis_client) + logger.info("Pinging client...") + self.redis_client.ping() + logger.info("Connection successful") + + latest_id = self.redis_client.get(self.redis_latest_id_key) + + if latest_id: + logger.info( + "Latest ID processed: %s (datetime: %s)", + latest_id, + datetime.datetime.fromtimestamp(int(latest_id.split("-")[0]) / 1000), + ) + else: + logger.info("No latest ID found") + + for redis_update in get_new_updates( + self.redis_client, stream_name=self.redis_stream_name, min_id=latest_id + ): + try: + self.process_redis_update(redis_update) + except Exception as e: + logger.exception(e) + self.redis_client.set(self.redis_latest_id_key, redis_update.id) + + def process_updates_since( + self, since: datetime.datetime, to: Optional[datetime.datetime] = None + ): + """Process all the updates since the given timestamp. + + :param client: the Redis client + :param since: the timestamp to start from + :param to: the timestamp to stop, defaults to None (process all + updates) + """ + logger.info("Redis client: %s", self.redis_client) + logger.info("Pinging client...") + self.redis_client.ping() + + processed = 0 + for product_update in get_processed_since( + self.redis_client, stream_name=self.redis_stream_name, min_id=since + ): + if to is not None and product_update.timestamp > to: + break + self.process_redis_update(product_update) + processed += 1 + + logger.info("Processed %d updates", processed) + + @abstractmethod + def process_redis_update(self, redis_update: RedisUpdate): + pass