Skip to content

Commit

Permalink
Merge pull request #415 from Koed00/f-strings
Browse files Browse the repository at this point in the history
Convert to f-strings
  • Loading branch information
Koed00 authored Feb 18, 2020
2 parents a1543c8 + 7c31c24 commit cf1b795
Show file tree
Hide file tree
Showing 14 changed files with 770 additions and 484 deletions.
44 changes: 23 additions & 21 deletions django_q/brokers/disque.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,45 @@

class Disque(Broker):
def enqueue(self, task):
retry = Conf.RETRY if Conf.RETRY > 0 else '{} REPLICATE 1'.format(Conf.RETRY)
retry = Conf.RETRY if Conf.RETRY > 0 else f"{Conf.RETRY} REPLICATE 1"
return self.connection.execute_command(
'ADDJOB {} {} 500 RETRY {}'.format(self.list_key, task, retry)).decode()
f"ADDJOB {self.list_key} {task} 500 RETRY {retry}"
).decode()

def dequeue(self):
tasks = self.connection.execute_command(
'GETJOB COUNT {} TIMEOUT 1000 FROM {}'.format(Conf.BULK, self.list_key))
if tasks:
return [(t[1].decode(), t[2].decode()) for t in tasks]
tasks = self.connection.execute_command(
f"GETJOB COUNT {Conf.BULK} TIMEOUT 1000 FROM {self.list_key}"
)
if tasks:
return [(t[1].decode(), t[2].decode()) for t in tasks]

def queue_size(self):
return self.connection.execute_command('QLEN {}'.format(self.list_key))
return self.connection.execute_command(f"QLEN {self.list_key}")

def acknowledge(self, task_id):
command = 'FASTACK' if Conf.DISQUE_FASTACK else 'ACKJOB'
return self.connection.execute_command('{} {}'.format(command,task_id))
command = "FASTACK" if Conf.DISQUE_FASTACK else "ACKJOB"
return self.connection.execute_command(f"{command} {task_id}")

def ping(self):
return self.connection.execute_command('HELLO')[0] > 0
return self.connection.execute_command("HELLO")[0] > 0

def delete(self, task_id):
return self.connection.execute_command('DELJOB {}'.format(task_id))
return self.connection.execute_command(f"DELJOB {task_id}")

def fail(self, task_id):
return self.delete(task_id)

def delete_queue(self):
jobs = self.connection.execute_command('JSCAN QUEUE {}'.format(self.list_key))[1]
jobs = self.connection.execute_command(f"JSCAN QUEUE {self.list_key}")[1]
if jobs:
job_ids = ' '.join(jid.decode() for jid in jobs)
self.connection.execute_command('DELJOB {}'.format(job_ids))
job_ids = " ".join(jid.decode() for jid in jobs)
self.connection.execute_command(f"DELJOB {job_ids}")
return len(jobs)

def info(self):
if not self._info:
info = self.connection.info('server')
self._info= 'Disque {}'.format(info['disque_version'])
info = self.connection.info("server")
self._info = f'Disque {info["disque_version"]}'
return self._info

@staticmethod
Expand All @@ -51,15 +53,15 @@ def get_connection(list_key=Conf.PREFIX):
random.shuffle(Conf.DISQUE_NODES)
# find one that works
for node in Conf.DISQUE_NODES:
host, port = node.split(':')
kwargs = {'host': host, 'port': port}
host, port = node.split(":")
kwargs = {"host": host, "port": port}
if Conf.DISQUE_AUTH:
kwargs['password'] = Conf.DISQUE_AUTH
kwargs["password"] = Conf.DISQUE_AUTH
redis_client = redis.Redis(**kwargs)
redis_client.decode_responses = True
try:
redis_client.execute_command('HELLO')
redis_client.execute_command("HELLO")
return redis_client
except redis.exceptions.ConnectionError:
continue
raise redis.exceptions.ConnectionError('Could not connect to any Disque nodes')
raise redis.exceptions.ConnectionError("Could not connect to any Disque nodes")
20 changes: 12 additions & 8 deletions django_q/brokers/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ def get_collection(self):
try:
Conf.MONGO_DB = self.connection.get_default_database().name
except ConfigurationError:
Conf.MONGO_DB = 'django-q'
Conf.MONGO_DB = "django-q"
return self.connection[Conf.MONGO_DB][self.list_key]

def queue_size(self):
return self.collection.count({'lock': {'$lte': _timeout()}})
return self.collection.count({"lock": {"$lte": _timeout()}})

def lock_size(self):
return self.collection.count({'lock': {'$gt': _timeout()}})
return self.collection.count({"lock": {"$gt": _timeout()}})

def purge_queue(self):
return self.delete_queue()
Expand All @@ -46,28 +46,32 @@ def ping(self):

def info(self):
if not self._info:
self._info = 'MongoDB {}'.format(self.connection.server_info()['version'])
self._info = f"MongoDB {self.connection.server_info()['version']}"
return self._info

def fail(self, task_id):
self.delete(task_id)

def enqueue(self, task):
inserted_id = self.collection.insert_one({'payload': task, 'lock': _timeout()}).inserted_id
inserted_id = self.collection.insert_one(
{"payload": task, "lock": _timeout()}
).inserted_id
return str(inserted_id)

def dequeue(self):
task = self.collection.find_one_and_update({'lock': {'$lte': _timeout()}}, {'$set': {'lock': timezone.now()}})
task = self.collection.find_one_and_update(
{"lock": {"$lte": _timeout()}}, {"$set": {"lock": timezone.now()}}
)
if task:
return [(str(task['_id']), task['payload'])]
return [(str(task["_id"]), task["payload"])]
# empty queue, spare the cpu
sleep(Conf.POLL)

def delete_queue(self):
return self.collection.drop()

def delete(self, task_id):
self.collection.delete_one({'_id': ObjectId(task_id)})
self.collection.delete_one({"_id": ObjectId(task_id)})

def acknowledge(self, task_id):
return self.delete(task_id)
27 changes: 20 additions & 7 deletions django_q/brokers/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ def get_connection(list_key=Conf.PREFIX):
return OrmQ.objects.using(Conf.ORM)

def queue_size(self):
return self.get_connection().filter(key=self.list_key, lock__lte=_timeout()).count()
return (
self.get_connection()
.filter(key=self.list_key, lock__lte=_timeout())
.count()
)

def lock_size(self):
return self.get_connection().filter(key=self.list_key, lock__gt=_timeout()).count()
return (
self.get_connection().filter(key=self.list_key, lock__gt=_timeout()).count()
)

def purge_queue(self):
return self.get_connection().filter(key=self.list_key).delete()
Expand All @@ -40,22 +46,30 @@ def ping(self):

def info(self):
if not self._info:
self._info = 'ORM {}'.format(Conf.ORM)
self._info = f"ORM {Conf.ORM}"
return self._info

def fail(self, task_id):
self.delete(task_id)

def enqueue(self, task):
package = self.get_connection().create(key=self.list_key, payload=task, lock=_timeout())
package = self.get_connection().create(
key=self.list_key, payload=task, lock=_timeout()
)
return package.pk

def dequeue(self):
tasks = self.get_connection().filter(key=self.list_key, lock__lt=_timeout())[0:Conf.BULK]
tasks = self.get_connection().filter(key=self.list_key, lock__lt=_timeout())[
0 : Conf.BULK
]
if tasks:
task_list = []
for task in tasks:
if self.get_connection().filter(id=task.id, lock=task.lock).update(lock=timezone.now()):
if (
self.get_connection()
.filter(id=task.id, lock=task.lock)
.update(lock=timezone.now())
):
task_list.append((task.pk, task.payload))
# else don't process, as another cluster has been faster than us on that task
return task_list
Expand All @@ -70,4 +84,3 @@ def delete(self, task_id):

def acknowledge(self, task_id):
return self.delete(task_id)

8 changes: 4 additions & 4 deletions django_q/brokers/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class Redis(Broker):
def __init__(self, list_key=Conf.PREFIX):
super(Redis, self).__init__(list_key='django_q:{}:q'.format(list_key))
super(Redis, self).__init__(list_key=f"django_q:{list_key}:q")

def enqueue(self, task):
return self.connection.rpush(self.list_key, task)
Expand All @@ -34,13 +34,13 @@ def ping(self):
try:
return self.connection.ping()
except redis.ConnectionError as e:
logger.error('Can not connect to Redis server.')
logger.error("Can not connect to Redis server.")
raise e

def info(self):
if not self._info:
info = self.connection.info('server')
self._info = 'Redis {}'.format(info['redis_version'])
info = self.connection.info("server")
self._info = f"Redis {info['redis_version']}"
return self._info

def set_stat(self, key, value, timeout):
Expand Down
Loading

0 comments on commit cf1b795

Please sign in to comment.