From 46a34cbff137d1c509ed4f90b88eb4e2d58c315a Mon Sep 17 00:00:00 2001 From: Charles Leifer Date: Tue, 16 Jul 2024 22:13:40 -0500 Subject: [PATCH] Remove junk sqs implementation and requirements for utc param in storage. --- huey/api.py | 2 +- huey/contrib/kyototycoon.py | 2 +- huey/contrib/sql_huey.py | 2 +- huey/contrib/sqs.py | 176 ------------------------------------ huey/storage.py | 13 ++- huey/tests/test_storage.py | 2 +- 6 files changed, 10 insertions(+), 187 deletions(-) delete mode 100644 huey/contrib/sqs.py diff --git a/huey/api.py b/huey/api.py index b0f41591..06e17132 100644 --- a/huey/api.py +++ b/huey/api.py @@ -594,7 +594,7 @@ def is_revoked(self, task, timestamp=None, peek=True): def add_schedule(self, task): data = self.serialize_task(task) eta = task.eta or datetime.datetime.fromtimestamp(0) - self.storage.add_to_schedule(data, eta, self.utc) + self.storage.add_to_schedule(data, eta) logger.info('Added task %s to schedule, eta %s', task.id, eta) self._emit(S.SIGNAL_SCHEDULED, task) diff --git a/huey/contrib/kyototycoon.py b/huey/contrib/kyototycoon.py index b7a6d84b..d60f694c 100644 --- a/huey/contrib/kyototycoon.py +++ b/huey/contrib/kyototycoon.py @@ -55,7 +55,7 @@ def flush_queue(self): def convert_ts(self, ts): return int(time.mktime(ts.timetuple())) - def add_to_schedule(self, data, ts, utc): + def add_to_schedule(self, data, ts): self.s.add(data, self.convert_ts(ts)) def read_schedule(self, ts): diff --git a/huey/contrib/sql_huey.py b/huey/contrib/sql_huey.py index 6b4e8377..5503c2ed 100644 --- a/huey/contrib/sql_huey.py +++ b/huey/contrib/sql_huey.py @@ -122,7 +122,7 @@ def enqueued_items(self, limit=None): def flush_queue(self): self.Task.delete().where(self.Task.queue == self.name).execute() - def add_to_schedule(self, data, timestamp, utc): + def add_to_schedule(self, data, timestamp): self.check_conn() self.Schedule.create(queue=self.name, data=data, timestamp=timestamp) diff --git a/huey/contrib/sqs.py b/huey/contrib/sqs.py deleted file mode 100644 index 02010c53..00000000 --- a/huey/contrib/sqs.py +++ /dev/null @@ -1,176 +0,0 @@ -import base64 -import datetime - -import boto3 -from botocore.exceptions import ClientError - -from huey.api import Huey -from huey.exceptions import HueyException -from huey.storage import BaseStorage -from huey.storage import EmptyData -from huey.utils import utcnow - - -""" -EXPERIMENTAL storage layer for SQS and S3. - -* Does not support priorities. -* Does not support at-least-once delivery. -* Limited support for scheduled/delayed execution (max 900 seconds). -* May rack up a lot of API calls? - -Usage: - -huey = SqsHuey( - name='huey', - queue_name='huey_queue', - bucket_name='huey.queue.results', - sqs_settings={'MaximumMessageSize': '262144'}, - s3_settings={'CreateBucketConfiguration': { - 'LocationConstraint': 'us-west-1'}}) - -""" - - -class SqsStorage(BaseStorage): - blocking = True - priority = False - - def __init__(self, name, queue_name=None, bucket_name=None, - sqs_settings=None, s3_settings=None, result_expire_days=30): - super(SqsStorage, self).__init__(name) - self.queue_name = queue_name or ('huey.%s' % queue_name) - self.queue_settings = sqs_settings or {} - self._sqs = boto3.resource('sqs') - self._queue = None - - self.bucket_name = bucket_name or ('huey.%s' % bucket_name) - self.bucket_settings = s3_settings or {} - self._s3 = boto3.resource('s3') - self._bucket = None - self.result_expire_days = result_expire_days - - @property - def queue(self): - if self._queue is None: - try: - self._queue = self._sqs.get_queue_by_name( - QueueName=self.queue_name) - except ClientError: - self._queue = self._sqs.create_queue( - QueueName=self.queue_name, - Attributes=self.queue_settings) - return self._queue - - @property - def bucket(self): - if self._bucket is None: - bucket = self._s3.Bucket(self.bucket_name) - if not bucket.creation_date: - bucket = self._s3.create_bucket( - Bucket=self.bucket_name, - **self.bucket_settings) - self._bucket = bucket - return self._bucket - - def enqueue(self, data, priority=None): - self.queue.send_message( - MessageBody=base64.b64encode(data).decode('ascii')) - - def dequeue(self): - try: - messages = self.queue.receive_messages( - MaxNumberOfMessages=1, - WaitTimeSeconds=20) - if messages: - message, = messages - data = base64.b64decode(message.body) - message.delete() - return data - except ClientError: - pass - - def flush_queue(self): - self.queue.purge() - - def add_to_schedule(self, data, ts, utc): - if utc: - now = utcnow() - else: - now = datetime.datetime.now() - delay_seconds = max(0, (ts - now).total_seconds()) - if delay_seconds > 900: - raise HueyException('SQS does not support delays of greater than ' - '900 seconds.') - self.queue.send_message( - MessageBody=base64.b64encode(data).encode('ascii'), - DelaySeconds=int(delay_seconds)) - - def read_schedule(self, ts): - return [] - - def put_data(self, key, value, is_result=False): - expires = utcnow() + datetime.timedelta(days=self.result_expire_days) - self.bucket.put_object( - Body=value, - Expires=expires, - Key=key) - - def peek_data(self, key): - obj = self.bucket.Object(key=key) - try: - resp = obj.get() - except ClientError: - return EmptyData - else: - return resp['Body'].read() - - def pop_data(self, key): - obj = self.bucket.Object(key=key) - try: - resp = obj.get() - except ClientError: - return EmptyData - else: - data = resp['Body'].read() - obj.delete() - return data - - def delete_data(self, key): - obj = self.bucket.Object(key=key) - try: - obj.delete() - except ClientError: - return False - else: - return True - - def has_data_for_key(self, key): - client = self.s3.meta.client - try: - client.head_object(Bucket=self.bucket.name, Key=key) - except ClientError: - return False - else: - return True - - def put_if_empty(self, key, value): - client = self.s3.meta.client - try: - client.head_object(Bucket=self.bucket.name, Key=key) - except ClientError: - client.put_object(Body=value, Bucket=self.bucket.name, Key=key) - return True - else: - return False - - def flush_results(self): - self.bucket.objects.delete() - - def flush_all(self): - self.flush_queue() - self.flush_results() - - -class SqsHuey(Huey): - storage_class = SqsStorage diff --git a/huey/storage.py b/huey/storage.py index 730dbd23..22147298 100644 --- a/huey/storage.py +++ b/huey/storage.py @@ -100,14 +100,13 @@ def flush_queue(self): """ raise NotImplementedError - def add_to_schedule(self, data, ts, utc): + def add_to_schedule(self, data, ts): """ Add the given task data to the schedule, to be executed at the given timestamp. :param bytes data: Task data. :param datetime ts: Timestamp at which task should be executed. - :param bool utc: Whether huey is in UTC-mode or local mode. :return: No return value. """ raise NotImplementedError @@ -247,7 +246,7 @@ def dequeue(self): pass def queue_size(self): return 0 def enqueued_items(self, limit=None): return [] def flush_queue(self): pass - def add_to_schedule(self, data, ts, utc): pass + def add_to_schedule(self, data, ts): pass def read_schedule(self, ts): return [] def schedule_size(self): return 0 def scheduled_items(self, limit=None): return [] @@ -296,7 +295,7 @@ def enqueued_items(self, limit=None): def flush_queue(self): self._queue = [] - def add_to_schedule(self, data, ts, utc): + def add_to_schedule(self, data, ts): heapq.heappush(self._schedule, (ts, data)) def read_schedule(self, ts): @@ -438,7 +437,7 @@ def enqueued_items(self, limit=None): def flush_queue(self): self.conn.delete(self.queue_key) - def add_to_schedule(self, data, ts, utc): + def add_to_schedule(self, data, ts): self.conn.zadd(self.schedule_key, {data: self.convert_ts(ts)}) def read_schedule(self, ts): @@ -753,7 +752,7 @@ def enqueued_items(self, limit=None): def flush_queue(self): self.sql('delete from task where queue=?', (self.name,), commit=True) - def add_to_schedule(self, data, ts, utc): + def add_to_schedule(self, data, ts): params = (self.name, to_blob(data), to_timestamp(ts)) self.sql('insert into schedule (queue, data, timestamp) ' 'values (?, ?, ?)', params, commit=True) @@ -937,7 +936,7 @@ def _timestamp_to_prefix(self, ts): ts = time.mktime(ts.timetuple()) + (ts.microsecond * 1e-6) return '%012x' % int(ts * 1000) - def add_to_schedule(self, data, ts, utc): + def add_to_schedule(self, data, ts): with self.lock: if not os.path.exists(self.schedule_path): os.makedirs(self.schedule_path) diff --git a/huey/tests/test_storage.py b/huey/tests/test_storage.py index 4a341953..91a3dfc7 100644 --- a/huey/tests/test_storage.py +++ b/huey/tests/test_storage.py @@ -70,7 +70,7 @@ def test_schedule_methods(self): (b'n1', timestamp - second), (b'p2', timestamp + second + second)) for data, ts in items: - self.s.add_to_schedule(data, ts, False) + self.s.add_to_schedule(data, ts) self.assertEqual(self.s.schedule_size(), 4)