Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ Development

1. Python >= 3.8
2. Install dependencies `requirements/dev.txt`
3. We use `isort` library to order and format our imports, and we check it using `flake8-isort` library (automatically on `flake8` run).
For convenience you may run `isort .` to order imports.
3. We use `isort` library to order and format our imports, and `black` - to format the code.
We check it using `flake8-isort` and `flake8-black` libraries (automatically on `flake8` run).
For convenience you may run `isort . && black .` to format the code.


Testing
Expand Down
19 changes: 12 additions & 7 deletions dj_cqrs/_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ def _validate_master(cqrs_settings):

def _validate_master_auto_update_fields(master_settings):
if 'CQRS_AUTO_UPDATE_FIELDS' in master_settings:
assert isinstance(master_settings['CQRS_AUTO_UPDATE_FIELDS'], bool), (
'CQRS master CQRS_AUTO_UPDATE_FIELDS must be bool.'
)
assert isinstance(
master_settings['CQRS_AUTO_UPDATE_FIELDS'],
bool,
), 'CQRS master CQRS_AUTO_UPDATE_FIELDS must be bool.'
else:
master_settings['CQRS_AUTO_UPDATE_FIELDS'] = DEFAULT_MASTER_AUTO_UPDATE_FIELDS

Expand All @@ -94,7 +95,8 @@ def _validate_master_message_ttl(master_settings):
# TODO: raise error in 2.0.0
logger.warning(
'Settings CQRS_MESSAGE_TTL=%s is invalid, using default %s.',
message_ttl, DEFAULT_MASTER_MESSAGE_TTL,
message_ttl,
DEFAULT_MASTER_MESSAGE_TTL,
)
master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL
else:
Expand Down Expand Up @@ -167,7 +169,8 @@ def _validate_replica_max_retries(replica_settings):
# TODO: raise error in 2.0.0
logger.warning(
'Replica setting CQRS_MAX_RETRIES=%s is invalid, using default %s.',
max_retries, DEFAULT_REPLICA_MAX_RETRIES,
max_retries,
DEFAULT_REPLICA_MAX_RETRIES,
)
replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES
else:
Expand All @@ -184,7 +187,8 @@ def _validate_replica_retry_delay(replica_settings):
# TODO: raise error in 2.0.0
logger.warning(
'Replica setting CQRS_RETRY_DELAY=%s is invalid, using default %s.',
retry_delay, DEFAULT_REPLICA_RETRY_DELAY,
retry_delay,
DEFAULT_REPLICA_RETRY_DELAY,
)
replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY

Expand All @@ -199,7 +203,8 @@ def _validate_replica_delay_queue_max_size(replica_settings):
# TODO: raise error in 2.0.0
logger.warning(
'Settings delay_queue_max_size=%s is invalid, using default %s.',
max_qsize, DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
max_qsize,
DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
)
max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE

Expand Down
17 changes: 13 additions & 4 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


def consume(payload):
""" Consumer controller.
"""Consumer controller.

:param dj_cqrs.dataclasses.TransportPayload payload: Consumed payload from master service.
"""
Expand All @@ -31,9 +31,14 @@ def consume(payload):


def route_signal_to_replica_model(
signal_type, cqrs_id, instance_data, previous_data=None, meta=None, queue=None,
signal_type,
cqrs_id,
instance_data,
previous_data=None,
meta=None,
queue=None,
):
""" Routes signal to model method to create/update/delete replica instance.
"""Routes signal to model method to create/update/delete replica instance.

:param dj_cqrs.constants.SignalType signal_type: Consumed signal type.
:param str cqrs_id: Replica model CQRS unique identifier.
Expand Down Expand Up @@ -85,6 +90,10 @@ def route_signal_to_replica_model(

logger.error(
'{0}\nCQRS {1} error: pk = {2}, cqrs_revision = {3} ({4}).'.format(
str(e), signal_type, pk_value, cqrs_revision, model_cls.CQRS_ID,
str(e),
signal_type,
pk_value,
cqrs_revision,
model_cls.CQRS_ID,
),
)
2 changes: 1 addition & 1 deletion dj_cqrs/controller/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


def produce(payload):
""" Producer controller.
"""Producer controller.

:param dj_cqrs.dataclasses.TransportPayload payload: TransportPayload.
"""
Expand Down
7 changes: 2 additions & 5 deletions dj_cqrs/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def retries(self):

@retries.setter
def retries(self, value):
assert value >= 0, "Payload retries field should be 0 or positive integer."
assert value >= 0, 'Payload retries field should be 0 or positive integer.'
self.__retries = value

def to_dict(self) -> dict:
Expand Down Expand Up @@ -157,7 +157,4 @@ def is_expired(self):
Returns:
(bool): True if payload is expired, False otherwise.
"""
return (
self.__expires is not None
and self.__expires <= timezone.now()
)
return self.__expires is not None and self.__expires <= timezone.now()
23 changes: 11 additions & 12 deletions dj_cqrs/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class DelayQueue:

def __init__(self, max_size=None):
if max_size is not None:
assert max_size > 0, "Delay queue max_size should be positive integer."
assert max_size > 0, 'Delay queue max_size should be positive integer.'

self._max_size = max_size
self._queue = PriorityQueue()
Expand Down Expand Up @@ -63,19 +63,18 @@ def put(self, delay_message):
"""
assert isinstance(delay_message, DelayMessage)
if self.full():
raise Full("Delay queue is full")

self._queue.put((
delay_message.eta.timestamp(),
delay_message.delivery_tag,
delay_message,
))
raise Full('Delay queue is full')

self._queue.put(
(
delay_message.eta.timestamp(),
delay_message.delivery_tag,
delay_message,
),
)

def qsize(self):
return self._queue.qsize()

def full(self):
return (
self._max_size is not None
and self.qsize() >= self._max_size
)
return self._max_size is not None and self.qsize() >= self._max_size
50 changes: 35 additions & 15 deletions dj_cqrs/management/commands/cqrs_bulk_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,35 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument(
'--cqrs-id', '-c',
'--cqrs-id',
'-c',
help='CQRS_ID of the master model',
type=str,
required=True,
)
parser.add_argument(
'--output', '-o',
'--output',
'-o',
help='Output file for dumping (- for writing to stdout)',
type=str,
default=None,
)
parser.add_argument(
'--batch', '-b',
'--batch',
'-b',
help='Batch size',
type=int,
default=10000,
)
parser.add_argument(
'--progress', '-p',
'--progress',
'-p',
help='Display progress',
action='store_true',
)
parser.add_argument(
'--force', '-f',
'--force',
'-f',
help='Override output file',
action='store_true',
)
Expand All @@ -63,8 +68,8 @@ def handle(self, *args, **options):
file=sys.stderr,
)
for qs in batch_qs(
model.relate_cqrs_serialization(model._default_manager.order_by().all()),
batch_size=batch_size,
model.relate_cqrs_serialization(model._default_manager.order_by().all()),
batch_size=batch_size,
):
ts = time.time()
cs = counter
Expand All @@ -76,23 +81,38 @@ def handle(self, *args, **options):
)
success_counter += 1
except Exception as e:
print('\nDump record failed for pk={0}: {1}: {2}'.format(
instance.pk, type(e).__name__, str(e),
), file=sys.stderr)
print(
'\nDump record failed for pk={0}: {1}: {2}'.format(
instance.pk,
type(e).__name__,
str(e),
),
file=sys.stderr,
)
if progress:
rate = (counter - cs) / (time.time() - ts)
percent = 100 * counter / db_count
eta = datetime.timedelta(seconds=int((db_count - counter) / rate))
sys.stderr.write(
'\r{0} of {1} processed - {2}% with '
'rate {3:.1f} rps, to go {4} ...{5:20}'.format(
counter, db_count, int(percent), rate, str(eta), ' ',
))
counter,
db_count,
int(percent),
rate,
str(eta),
' ',
),
)
sys.stderr.flush()

print('Done!\n{0} instance(s) saved.\n{1} instance(s) processed.'.format(
success_counter, counter,
), file=sys.stderr)
print(
'Done!\n{0} instance(s) saved.\n{1} instance(s) processed.'.format(
success_counter,
counter,
),
file=sys.stderr,
)

@staticmethod
def _get_model(options):
Expand Down
14 changes: 9 additions & 5 deletions dj_cqrs/management/commands/cqrs_bulk_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,23 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument(
'--input', '-i',
'--input',
'-i',
help='Input file for loading (- for reading from stdin)',
type=str, required=True,
type=str,
required=True,
)
parser.add_argument(
'--clear', '-c',
'--clear',
'-c',
help='Delete existing models',
type=bool,
required=False,
default=False,
)
parser.add_argument(
'--batch', '-b',
'--batch',
'-b',
help='Batch size',
type=int,
default=10000,
Expand Down Expand Up @@ -58,7 +62,7 @@ def handle(self, *args, **options):
try:
model._default_manager.all().delete()
except DatabaseError:
raise CommandError("Delete operation fails!")
raise CommandError('Delete operation fails!')

self._process(f, model, batch_size)

Expand Down
41 changes: 19 additions & 22 deletions dj_cqrs/management/commands/cqrs_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

def consume(**kwargs):
import django

django.setup()

from dj_cqrs.transport import current_transport

try:
current_transport.consume(**kwargs)
except KeyboardInterrupt:
Expand All @@ -34,15 +36,14 @@ def _display_path(path):


class WorkersManager:

def __init__(
self,
consume_kwargs,
workers=1,
reload=False,
ignore_paths=None,
sigint_timeout=5,
sigkill_timeout=1,
self,
consume_kwargs,
workers=1,
reload=False,
ignore_paths=None,
sigint_timeout=5,
sigkill_timeout=1,
):
self.pool = []
self.workers = workers
Expand Down Expand Up @@ -137,10 +138,7 @@ def add_arguments(self, parser):
parser.add_argument(
'--reload',
'-r',
help=(
'Enable reload signal SIGHUP and autoreload '
'on file changes'
),
help=('Enable reload signal SIGHUP and autoreload ' 'on file changes'),
action='store_true',
default=False,
)
Expand Down Expand Up @@ -170,17 +168,16 @@ def add_arguments(self, parser):
)

def handle(
self,
*args,
workers=1,
cqrs_id=None,
reload=False,
ignore_paths=None,
sigint_timeout=5,
sigkill_timeout=1,
**options,
self,
*args,
workers=1,
cqrs_id=None,
reload=False,
ignore_paths=None,
sigint_timeout=5,
sigkill_timeout=1,
**options,
):

paths_to_ignore = None
if ignore_paths:
paths_to_ignore = [Path(p).resolve() for p in ignore_paths.split(',')]
Expand Down
Loading