Skip to content

Commit

Permalink
build: Upgrade yapw, to handle RabbitMQ restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmckinney committed Jul 3, 2023
1 parent 8175736 commit 3b7dfc5
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 52 deletions.
6 changes: 4 additions & 2 deletions process/management/commands/api_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import transaction
from yapw.methods.blocking import ack, publish
from yapw.methods import ack, publish

from process.models import Collection
from process.processors.loader import create_collection_file
Expand All @@ -16,7 +16,9 @@

class Command(BaseCommand):
def handle(self, *args, **options):
consume(callback, routing_key, consume_routing_keys, decorator=decorator)
consume(
on_message_callback=callback, queue=routing_key, routing_keys=consume_routing_keys, decorator=decorator
)


def callback(client_state, channel, method, properties, input_message):
Expand Down
6 changes: 4 additions & 2 deletions process/management/commands/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from yapw.methods.blocking import ack, publish
from yapw.methods import ack, publish

try:
from libcoveocds.api import ocds_json_output
Expand All @@ -28,7 +28,9 @@ def handle(self, *args, **options):
if not using_libcoveocds:
raise CommandError("Checker is unavailable. Install the libcoveocds Python package.")

consume(callback, routing_key, consume_routing_keys, decorator=decorator)
consume(
on_message_callback=callback, queue=routing_key, routing_keys=consume_routing_keys, decorator=decorator
)


def callback(client_state, channel, method, properties, input_message):
Expand Down
6 changes: 4 additions & 2 deletions process/management/commands/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from django.conf import settings
from django.core.management.base import BaseCommand
from yapw.methods.blocking import ack, publish
from yapw.methods import ack, publish

from process.models import Collection, CollectionFile, ProcessingStep, Record
from process.util import RECORD_PACKAGE, RELEASE_PACKAGE, consume, create_step, decorator
Expand All @@ -14,7 +14,9 @@

class Command(BaseCommand):
def handle(self, *args, **options):
consume(callback, routing_key, consume_routing_keys, decorator=decorator)
consume(
on_message_callback=callback, queue=routing_key, routing_keys=consume_routing_keys, decorator=decorator
)


def callback(client_state, channel, method, properties, input_message):
Expand Down
6 changes: 4 additions & 2 deletions process/management/commands/file_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ijson.common import ObjectBuilder
from ocdskit.upgrade import upgrade_10_11
from ocdskit.util import detect_format
from yapw.methods.blocking import ack, nack, publish
from yapw.methods import ack, nack, publish

from process.models import (
CollectionFile,
Expand Down Expand Up @@ -42,7 +42,9 @@

class Command(BaseCommand):
def handle(self, *args, **options):
consume(callback, routing_key, consume_routing_keys, decorator=decorator)
consume(
on_message_callback=callback, queue=routing_key, routing_keys=consume_routing_keys, decorator=decorator
)


def callback(client_state, channel, method, properties, input_message):
Expand Down
6 changes: 4 additions & 2 deletions process/management/commands/finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models.functions import Now
from yapw.methods.blocking import ack
from yapw.methods import ack

from process.models import Collection
from process.util import consume, decorator
Expand All @@ -17,7 +17,9 @@

class Command(BaseCommand):
def handle(self, *args, **options):
consume(callback, routing_key, consume_routing_keys, decorator=decorator)
consume(
on_message_callback=callback, queue=routing_key, routing_keys=consume_routing_keys, decorator=decorator
)


def callback(client_state, channel, method, properties, input_message):
Expand Down
6 changes: 4 additions & 2 deletions process/management/commands/record_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.core.management.base import BaseCommand
from django.db import transaction
from ocdskit.util import is_linked_release
from yapw.methods.blocking import ack, publish
from yapw.methods import ack, publish

from process.exceptions import AlreadyExists
from process.models import Collection, CollectionNote, CompiledRelease, ProcessingStep, Record
Expand All @@ -21,7 +21,9 @@ class Command(BaseCommand):
"""

def handle(self, *args, **options):
consume(callback, routing_key, consume_routing_keys, decorator=decorator)
consume(
on_message_callback=callback, queue=routing_key, routing_keys=consume_routing_keys, decorator=decorator
)


def callback(client_state, channel, method, properties, input_message):
Expand Down
6 changes: 4 additions & 2 deletions process/management/commands/release_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from django.core.management.base import BaseCommand
from django.db import transaction
from yapw.methods.blocking import ack, publish
from yapw.methods import ack, publish

from process.exceptions import AlreadyExists
from process.models import Collection, CompiledRelease, ProcessingStep, Release
Expand All @@ -20,7 +20,9 @@ class Command(BaseCommand):
"""

def handle(self, *args, **options):
consume(callback, routing_key, consume_routing_keys, decorator=decorator)
consume(
on_message_callback=callback, queue=routing_key, routing_keys=consume_routing_keys, decorator=decorator
)


def callback(client_state, channel, method, properties, input_message):
Expand Down
6 changes: 4 additions & 2 deletions process/management/commands/wiper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from django.core.management.base import BaseCommand
from django.db import connections
from yapw.methods.blocking import ack
from yapw.methods import ack

from process.models import Collection
from process.util import consume, decorator
Expand All @@ -20,7 +20,9 @@ def bulk_batch_size(self, fields, objs):

class Command(BaseCommand):
def handle(self, *args, **options):
consume(callback, routing_key, consume_routing_keys, decorator=decorator)
consume(
on_message_callback=callback, queue=routing_key, routing_keys=consume_routing_keys, decorator=decorator
)


def callback(client_state, channel, method, properties, input_message):
Expand Down
36 changes: 6 additions & 30 deletions process/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
from contextlib import contextmanager
from textwrap import fill

import pika.exceptions
import simplejson as json
from django.conf import settings
from django.db import connections
from django.db.utils import IntegrityError
from yapw import clients
from yapw.clients import AsyncConsumer, Blocking
from yapw.decorators import decorate
from yapw.methods.blocking import nack
from yapw.methods import nack

from process.exceptions import AlreadyExists, InvalidFormError
from process.models import Collection, CollectionFile, CollectionNote, ProcessingStep
Expand All @@ -22,6 +21,7 @@
# These must match the output of ocdskit.util.detect_format().
RELEASE_PACKAGE = "release package"
RECORD_PACKAGE = "record package"
YAPW_KWARGS = {"url": settings.RABBIT_URL, "exchange": settings.RABBIT_EXCHANGE_NAME, "prefetch_count": 20}


def wrap(string):
Expand All @@ -48,42 +48,18 @@ def get_hash(data):
).hexdigest()


class Consumer(clients.Threaded, clients.Durable, clients.Blocking, clients.Base):
pass


class Publisher(clients.Durable, clients.Blocking, clients.Base):
pass


def get_client(klass, **kwargs):
return klass(url=settings.RABBIT_URL, exchange=settings.RABBIT_EXCHANGE_NAME, **kwargs)


@contextmanager
def get_publisher():
client = get_client(Publisher)
client = Blocking(**YAPW_KWARGS)
try:
yield client
finally:
client.close()


# https://github.com/pika/pika/blob/master/examples/blocking_consume_recover_multiple_hosts.py
def consume(*args, **kwargs):
while True:
try:
client = get_client(Consumer, prefetch_count=20)
client.consume(*args, **kwargs)
break
# Do not recover if the connection was closed by the broker.
except pika.exceptions.ConnectionClosedByBroker as e: # subclass of AMQPConnectionError
logger.warning(e)
break
# Recover from "Connection reset by peer".
except pika.exceptions.StreamLostError as e: # subclass of AMQPConnectionError
logger.warning(e)
continue
client = AsyncConsumer(*args, **kwargs, **YAPW_KWARGS)
client.start()


def decorator(decode, callback, state, channel, method, properties, body):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ webencodings==0.5.1
# via bleach
xmltodict==0.12.0
# via flattentool
yapw[perf]==0.0.13
yapw[perf]==0.1.1
# via -r requirements_nongpl.txt

# The following packages are considered to be unsafe in a requirements file:
Expand Down
2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ xmltodict==0.12.0
# via
# -r requirements.txt
# flattentool
yapw[perf]==0.0.13
yapw[perf]==0.1.1
# via -r requirements.txt

# The following packages are considered to be unsafe in a requirements file:
Expand Down
2 changes: 1 addition & 1 deletion requirements_nongpl.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ urllib3==1.26.15
# via
# requests
# sentry-sdk
yapw[perf]==0.0.13
yapw[perf]==0.1.1
# via -r requirements_nongpl.in

# The following packages are considered to be unsafe in a requirements file:
Expand Down
6 changes: 3 additions & 3 deletions tests/commands/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_time_invalid(self):
f"data_version '{value}' is not in \"YYYY-MM-DD HH:MM:SS\" format or is an invalid date/time",
)

@patch("yapw.methods.blocking.publish")
@patch("yapw.methods.publish")
def test_source_invalid(self, publish):
with captured_stderr() as stderr:
try:
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_source_invalid_scrapyd_close(self, spiders):
)

@patch("process.scrapyd.spiders")
@patch("yapw.methods.blocking.publish")
@patch("yapw.methods.publish")
def test_source_invalid_scrapyd_force(self, spiders, publish):
spiders.return_value = ["france"]

Expand All @@ -129,7 +129,7 @@ def test_source_invalid_scrapyd_force(self, spiders, publish):
self.fail(f"Unexpected exception {e}")

@patch("process.scrapyd.spiders")
@patch("yapw.methods.blocking.publish")
@patch("yapw.methods.publish")
def test_source_local(self, spiders, publish):
spiders.return_value = ["france"]

Expand Down

0 comments on commit 3b7dfc5

Please sign in to comment.