Skip to content

Commit

Permalink
Merge pull request #550 from sanger/DPL-199-create-feedback-messages
Browse files Browse the repository at this point in the history
DPL-199: Perform simple message processing from RabbitMQ
  • Loading branch information
sdjmchattie authored Apr 21, 2022
2 parents 2e96295 + 88b8e66 commit 704fd8f
Show file tree
Hide file tree
Showing 26 changed files with 745 additions and 67 deletions.
14 changes: 2 additions & 12 deletions crawler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from flask_apscheduler import APScheduler

from crawler.constants import SCHEDULER_JOB_ID_RUN_CRAWLER
from crawler.rabbit.background_consumer import BackgroundConsumer
from crawler.types import RabbitServerDetails
from crawler.rabbit.rabbit_stack import RabbitStack

scheduler = APScheduler()

Expand Down Expand Up @@ -58,13 +57,4 @@ def start_rabbit_consumer(app):
]:
return

rabbit_server = RabbitServerDetails(
uses_ssl=app.config["RABBITMQ_SSL"],
host=app.config["RABBITMQ_HOST"],
port=app.config["RABBITMQ_PORT"],
username=app.config["RABBITMQ_USERNAME"],
password=app.config["RABBITMQ_PASSWORD"],
vhost=app.config["RABBITMQ_VHOST"],
)
rabbit_queue = app.config["RABBITMQ_CRUD_QUEUE"]
BackgroundConsumer(rabbit_server, rabbit_queue).start()
RabbitStack().bring_stack_up()
7 changes: 7 additions & 0 deletions crawler/config/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@
RABBITMQ_PASSWORD = "development"
RABBITMQ_VHOST = "heron"
RABBITMQ_CRUD_QUEUE = "heron.crud-operations"
RABBITMQ_FEEDBACK_EXCHANGE = "psd.heron"

###
# RedPanda details
###
REDPANDA_BASE_URI = f"http://{os.environ.get('LOCALHOST', '127.0.0.1')}:8081"
REDPANDA_API_KEY = ""

###
# SFTP details
Expand Down
19 changes: 19 additions & 0 deletions crawler/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@
# As per Beckman events detailed in https://ssg-confluence.internal.sanger.ac.uk/display/PSDPUB/Cherrypicking+Events
PLATE_EVENT_DESTINATION_CREATED: Final[str] = "lh_beckman_cp_destination_created"

###
# RabbitMQ message keys
###
RABBITMQ_CREATE_FEEDBACK_ORIGIN_PARSING = "parsing"
RABBITMQ_CREATE_FEEDBACK_ORIGIN_PLATE = "plate"
RABBITMQ_CREATE_FEEDBACK_ORIGIN_ROOT = "root"
RABBITMQ_CREATE_FEEDBACK_ORIGIN_SAMPLE = "sample"

RABBITMQ_FIELD_LAB_ID = "labId"
RABBITMQ_FIELD_MESSAGE_UUID = "messageUuid"
RABBITMQ_FIELD_PLATE = "plate"

RABBITMQ_HEADER_KEY_SUBJECT = "subject"
RABBITMQ_HEADER_KEY_VERSION = "version"

RABBITMQ_ROUTING_KEY_CREATE_PLATE_FEEDBACK = "feedback.created.plate"

RABBITMQ_SUBJECT_CREATE_PLATE = "create-plate-map"
RABBITMQ_SUBJECT_CREATE_PLATE_FEEDBACK = "create-plate-map-feedback"

###
# Flask endpoints
Expand Down
17 changes: 16 additions & 1 deletion crawler/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,23 @@ def __str__(self):
return f"DartStateError: {default_message}"


class CherrypickerDataError(Exception):
class CherrypickerDataError(Error):
"""Raised during cherrypicker test data creation. The message is assumed to be user friendly."""

def __init__(self, message):
self.message = message


class TransientRabbitError(Error):
"""
Raised during processing of a RabbitMQ message when a transient issue occurs.
For example, this might be a database being inaccessible. The message should be reprocessed.
"""

def __init__(self, message):
"""Constructs a new processing error message.
Arguments:
message {str} -- A message to log and possibly show to the user/caller.
"""
self.message = message
Empty file added crawler/processing/__init__.py
Empty file.
111 changes: 111 additions & 0 deletions crawler/processing/create_plate_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging

from crawler.config.centres import CENTRE_DATA_SOURCE_RABBITMQ, get_centres_config
from crawler.config.defaults import RABBITMQ_FEEDBACK_EXCHANGE
from crawler.constants import (
CENTRE_KEY_LAB_ID_DEFAULT,
RABBITMQ_CREATE_FEEDBACK_ORIGIN_PARSING,
RABBITMQ_CREATE_FEEDBACK_ORIGIN_PLATE,
RABBITMQ_FIELD_LAB_ID,
RABBITMQ_FIELD_MESSAGE_UUID,
RABBITMQ_FIELD_PLATE,
RABBITMQ_ROUTING_KEY_CREATE_PLATE_FEEDBACK,
RABBITMQ_SUBJECT_CREATE_PLATE_FEEDBACK,
)
from crawler.exceptions import TransientRabbitError
from crawler.rabbit.avro_encoder import AvroEncoder
from crawler.rabbit.messages.create_feedback_message import CreateFeedbackError, CreateFeedbackMessage

LOGGER = logging.getLogger(__name__)


class CreatePlateProcessor:
def __init__(self, schema_registry, basic_publisher, config):
self._encoder = AvroEncoder(schema_registry, RABBITMQ_SUBJECT_CREATE_PLATE_FEEDBACK)
self._basic_publisher = basic_publisher
self._config = config

self._centres = None

def process(self, message):
self._centres = None

try:
self._validate_message(message)
except TransientRabbitError as ex:
LOGGER.error(f"Transient error while processing message: {ex.message}")
raise # Cause the consumer to restart and try this message again. Ideally we will delay the consumer.
except Exception as ex:
LOGGER.error(f"Unhandled error while processing message: {type(ex)} {str(ex)}")
self._publish_feedback(
message,
additional_errors=[
CreateFeedbackError(
origin=RABBITMQ_CREATE_FEEDBACK_ORIGIN_PARSING,
description="An unhandled error occurred while processing the message.",
)
],
)
return False # Send the message to dead letters

self._publish_feedback(message)
return len(message.errors) == 0

@property
def centres(self):
if self._centres is None:
try:
self._centres = get_centres_config(self._config, CENTRE_DATA_SOURCE_RABBITMQ)
except Exception:
raise TransientRabbitError("Unable to reach MongoDB while getting centres config.")

return self._centres

def _publish_feedback(self, message, additional_errors=()):
message_uuid = message.message[RABBITMQ_FIELD_MESSAGE_UUID].decode()
errors = message.errors
errors.extend(additional_errors)

feedback_message = CreateFeedbackMessage(
sourceMessageUuid=message_uuid,
countOfTotalSamples=0,
countOfValidSamples=0,
operationWasErrorFree=len(errors) == 0,
errors=errors,
)

encoded_message = self._encoder.encode([feedback_message])
self._basic_publisher.publish_message(
RABBITMQ_FEEDBACK_EXCHANGE,
RABBITMQ_ROUTING_KEY_CREATE_PLATE_FEEDBACK,
encoded_message.body,
RABBITMQ_SUBJECT_CREATE_PLATE_FEEDBACK,
encoded_message.version,
)

@staticmethod
def _add_error(message, origin, description, sample_uuid="", field=""):
LOGGER.error(
f"Error found in message with origin '{origin}', sampleUuid '{sample_uuid}', field '{field}': {description}"
)
message.add_error(
CreateFeedbackError(
origin=origin,
sampleUuid=sample_uuid,
field=field,
description=description,
)
)

def _validate_message(self, message):
body = message.message

# Check that the message is for a centre we are accepting RabbitMQ messages for.
lab_id = body[RABBITMQ_FIELD_PLATE][RABBITMQ_FIELD_LAB_ID]
if lab_id not in [c[CENTRE_KEY_LAB_ID_DEFAULT] for c in self.centres]:
CreatePlateProcessor._add_error(
message,
RABBITMQ_CREATE_FEEDBACK_ORIGIN_PLATE,
f"The lab ID provided '{lab_id}' is not configured to receive messages via RabbitMQ.",
field=RABBITMQ_FIELD_LAB_ID,
)
40 changes: 40 additions & 0 deletions crawler/processing/rabbit_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from crawler.constants import RABBITMQ_HEADER_KEY_SUBJECT, RABBITMQ_HEADER_KEY_VERSION


class RabbitMessage:
def __init__(self, headers, encoded_body):
self.headers = headers
self.encoded_body = encoded_body
self.errors = []

self._subject = None
self._schema_version = None
self._decoded_list = None
self._message = None

@property
def subject(self):
if self._subject is None:
self._subject = self.headers[RABBITMQ_HEADER_KEY_SUBJECT]
return self._subject

@property
def schema_version(self):
if self._schema_version is None:
self._schema_version = self.headers[RABBITMQ_HEADER_KEY_VERSION]
return self._schema_version

def decode(self, encoder):
self._decoded_list = list(encoder.decode(self.encoded_body, self.schema_version))

@property
def contains_single_message(self):
return self._decoded_list and len(self._decoded_list) == 1

@property
def message(self):
if self._decoded_list:
return self._decoded_list[0]

def add_error(self, error):
self.errors.append(error)
42 changes: 42 additions & 0 deletions crawler/processing/rabbit_message_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging

from crawler.constants import RABBITMQ_SUBJECT_CREATE_PLATE
from crawler.processing.create_plate_processor import CreatePlateProcessor
from crawler.processing.rabbit_message import RabbitMessage
from crawler.rabbit.avro_encoder import AvroEncoder

LOGGER = logging.getLogger(__name__)


class RabbitMessageProcessor:
def __init__(self, schema_registry, basic_publisher, config):
self._schema_registry = schema_registry
self._basic_publisher = basic_publisher
self._config = config

self._processors = {
RABBITMQ_SUBJECT_CREATE_PLATE: CreatePlateProcessor(
self._schema_registry, self._basic_publisher, self._config
)
}

def process_message(self, headers, body):
message = RabbitMessage(headers, body)
try:
message.decode(AvroEncoder(self._schema_registry, message.subject))
except Exception as ex:
LOGGER.error(f"Unrecoverable error while decoding RabbitMQ message: {type(ex)} {str(ex)}")
return False # Send the message to dead letters.

if not message.contains_single_message:
LOGGER.error("RabbitMQ message received containing multiple AVRO encoded messages.")
return False # Send the message to dead letters.

try:
return self._processors[message.subject].process(message)
except KeyError:
LOGGER.error(
f"Received message has subject '{message.subject}'"
" but there is no implemented processor for this subject."
)
return False # Send the message to dead letters.
32 changes: 14 additions & 18 deletions crawler/rabbit/async_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ class AsyncConsumer(object):
commands that were issued and that should surface in the output as well.
"""

def __init__(self, server_details, queue):
def __init__(self, server_details, queue, process_message):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
:param bool use_ssl: Whether to use SSL when connecting to the AMQP endpoint.
:param str host: The AMQP host to connect to.
:param int port: The AMQP port to connect with.
:param str username: The AMQP username with read access to the queue.
:param str password: The AMQP password for the username.
:param str password: The AMQP virtual host to consume from.
:param RabbitServerDetails server_details: The RabbitMQ server connection details.
:param str queue: The AMQP queue to consume from.
:param func process_message: A function to call with details of any messages consumed from the queue.
This function will be passed the message headers and the message body and should
return a boolean indicating whether the message was processed successfully (True)
or failed to be processed and should be dead-lettered (False).
"""
self.should_reconnect = False
self.was_consuming = False
Expand All @@ -45,6 +44,7 @@ def __init__(self, server_details, queue):
self._consumer_tag = None
self._server_details = server_details
self._queue = queue
self._process_message = process_message
self._consuming = False
# In production, experiment with higher prefetch values
# for higher consumer throughput
Expand Down Expand Up @@ -248,31 +248,27 @@ def on_consumer_cancelled(self, method_frame):
if self._channel:
self._channel.close()

def on_message(self, _unused_channel, basic_deliver, properties, body):
def on_message(self, channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel _unused_channel: The channel object
:param pika.channel.Channel channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param bytes body: The message body
"""
LOGGER.info("Received message # %s from %s: %s", basic_deliver.delivery_tag, properties.app_id, body)
self.acknowledge_message(basic_deliver.delivery_tag)
delivery_tag = basic_deliver.delivery_tag

def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
if self._channel:
if self._process_message(properties.headers, body):
LOGGER.info("Acknowledging message %s", delivery_tag)
self._channel.basic_ack(delivery_tag)
channel.basic_ack(delivery_tag)
else:
LOGGER.error("No channel to perform basic acknowledgement on")
LOGGER.info("Rejecting message %s", delivery_tag)
channel.basic_nack(delivery_tag, requeue=False)

def stop_consuming(self):
"""Tell RabbitMQ that you would like to stop consuming by sending the
Expand Down
Loading

0 comments on commit 704fd8f

Please sign in to comment.