From 901016e32d4b2c3e5a7036f20c45cfd18feec46c Mon Sep 17 00:00:00 2001 From: Rebecca Graber Date: Fri, 25 Feb 2022 16:34:18 -0500 Subject: [PATCH 1/3] feat: rm kafka_consumer --- lms/envs/common.py | 3 - openedx/core/djangoapps/kafka_consumer/README | 17 --- .../djangoapps/kafka_consumer/__init__.py | 0 .../core/djangoapps/kafka_consumer/apps.py | 7 -- .../djangoapps/kafka_consumer/consumers.py | 99 --------------- .../docs/0001_kafka_consumer_command.rst | 30 ----- .../kafka_consumer/management/__init__.py | 0 .../management/commands/__init__.py | 0 .../management/commands/consume_events.py | 118 ------------------ 9 files changed, 274 deletions(-) delete mode 100644 openedx/core/djangoapps/kafka_consumer/README delete mode 100644 openedx/core/djangoapps/kafka_consumer/__init__.py delete mode 100644 openedx/core/djangoapps/kafka_consumer/apps.py delete mode 100644 openedx/core/djangoapps/kafka_consumer/consumers.py delete mode 100644 openedx/core/djangoapps/kafka_consumer/docs/0001_kafka_consumer_command.rst delete mode 100644 openedx/core/djangoapps/kafka_consumer/management/__init__.py delete mode 100644 openedx/core/djangoapps/kafka_consumer/management/commands/__init__.py delete mode 100644 openedx/core/djangoapps/kafka_consumer/management/commands/consume_events.py diff --git a/lms/envs/common.py b/lms/envs/common.py index 44d22a5965c6..ad6c6b75ec9d 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -3213,9 +3213,6 @@ def _make_locale_paths(settings): # pylint: disable=missing-function-docstring # For save for later 'lms.djangoapps.save_for_later', - # TODO (EventBus): Make Kafka/event-bus optional - 'openedx.core.djangoapps.kafka_consumer', - # Course Live App 'openedx.core.djangoapps.course_live', ] diff --git a/openedx/core/djangoapps/kafka_consumer/README b/openedx/core/djangoapps/kafka_consumer/README deleted file mode 100644 index 36453a10b9cc..000000000000 --- a/openedx/core/djangoapps/kafka_consumer/README +++ /dev/null @@ -1,17 +0,0 @@ - -=============== -Kafka Consumer -=============== - -Purpose -======= - -This is a (likely temporary) app created to test and iterate on event bus consumer patterns. The goal is to eventually -have a flexible event bus that can be easily brought into other apps and repositories to produce and consume arbitrary -topics. Ideally, the event bus itself will also be an abstraction behind which platform maintainers can use non-Kafka -implementations (Redis, Pulsar, etc.). The documentation/ADRs may also be moved to more appropriate places as the -process matures. - -There are a hefty number of "# TODO (EventBus)" annotations left in to help guide further development. This app is intended to be subject to frequent and rapid changes. Outside of testing this app, it is best to leave the -KAFKA_CONSUMERS_ENABLED setting off. - diff --git a/openedx/core/djangoapps/kafka_consumer/__init__.py b/openedx/core/djangoapps/kafka_consumer/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/openedx/core/djangoapps/kafka_consumer/apps.py b/openedx/core/djangoapps/kafka_consumer/apps.py deleted file mode 100644 index 1d6896457430..000000000000 --- a/openedx/core/djangoapps/kafka_consumer/apps.py +++ /dev/null @@ -1,7 +0,0 @@ -"""App for consuming Kafka events. Comprises a management command for listening to a topic and supporting methods. -Likely temporary.""" -from django.apps import AppConfig - - -class KafkaConsumerApp(AppConfig): - name = 'openedx.core.djangoapps.kafka_consumer' diff --git a/openedx/core/djangoapps/kafka_consumer/consumers.py b/openedx/core/djangoapps/kafka_consumer/consumers.py deleted file mode 100644 index ac098c90fdc4..000000000000 --- a/openedx/core/djangoapps/kafka_consumer/consumers.py +++ /dev/null @@ -1,99 +0,0 @@ -"""Event handling for license-manager events. Likely temporary until an abstraction layer is put in place.""" - -import logging - -from confluent_kafka.schema_registry.avro import AvroDeserializer -from django.conf import settings - -logger = logging.getLogger(__name__) - - -# TODO (EventBus): -# Use TrackingEvent class from openedx_events and use Attr <-> Avro bridge to deserialize -class TrackingEvent: - """ - Temporary copy of TrackingEvent from license-manager to represent license manager events. - Eventually to be moved to openedx_events. - """ - - def __init__(self, *args, **kwargs): - self.license_uuid = kwargs.get('license_uuid', None) - self.license_activation_key = kwargs.get('license_activation_key', None) - self.previous_license_uuid = kwargs.get('previous_license_uuid', None) - self.assigned_date = kwargs.get('assigned_date', None) - self.activation_date = kwargs.get('activation_date', None) - self.assigned_lms_user_id = kwargs.get('assigned_lms_user_id', None) - self.assigned_email = kwargs.get('assigned_email', None) - self.expiration_processed = kwargs.get('expiration_processed', None) - self.auto_applied = kwargs.get('auto_applied', None) - self.enterprise_customer_uuid = kwargs.get('enterprise_customer_uuid', None) - self.enterprise_customer_slug = kwargs.get('enterprise_customer_slug', None) - self.enterprise_customer_name = kwargs.get('enterprise_customer_name', None) - self.customer_agreement_uuid = kwargs.get('customer_agreement_uuid', None) - - # Some paths will set assigned_lms_user_id to '' if empty, so need to allow strings in the schema - TRACKING_EVENT_AVRO_SCHEMA = """ - { - "namespace": "license_manager.apps.subscriptions", - "name": "TrackingEvent", - "type": "record", - "fields": [ - {"name": "license_uuid", "type": "string"}, - {"name": "license_activation_key", "type": "string"}, - {"name": "previous_license_uuid", "type": "string"}, - {"name": "assigned_date", "type": "string"}, - {"name": "assigned_lms_user_id", "type": ["int", "string", "null"], "default": "null"}, - {"name": "assigned_email", "type":"string"}, - {"name": "expiration_processed", "type": "boolean"}, - {"name": "auto_applied", "type": "boolean", "default": "false"}, - {"name": "enterprise_customer_uuid", "type": ["string", "null"], "default": "null"}, - {"name": "customer_agreement_uuid", "type": ["string", "null"], "default": "null"}, - {"name": "enterprise_customer_slug", "type": ["string", "null"], "default": "null"}, - {"name": "enterprise_customer_name", "type": ["string", "null"], "default": "null"} - ] - } - - """ - - @staticmethod - def from_dict(dict_instance, ctx=None): # pylint: disable=unused-argument - return TrackingEvent(**dict_instance) - - @staticmethod - def to_dict(obj, ctx=None): # pylint: disable=unused-argument - # remove lms id and email from to_dict for event consumer to not print PII - return { - 'enterprise_customer_uuid': obj.enterprise_customer_uuid, - 'customer_agreement_uuid': obj.customer_agreement_uuid, - 'enterprise_customer_slug': obj.enterprise_customer_slug, - 'enterprise_customer_name': obj.enterprise_customer_name, - "license_uuid": obj.license_uuid, - "license_activation_key": obj.license_activation_key, - "previous_license_uuid": obj.previous_license_uuid, - "assigned_date": obj.assigned_date, - "activation_date": obj.activation_date, - "expiration_processed": obj.expiration_processed, - "auto_applied": (obj.auto_applied or False), - } - - -## TODO (EventBus): Make an extensible base class for message handlers -class LicenseMessageHandler: - """ Simple class to deserialize and print LicenseEvents from Kafka""" - @staticmethod - def getDeserializer(schema_registry_client): - return AvroDeserializer(schema_str=TrackingEvent.TRACKING_EVENT_AVRO_SCHEMA, - schema_registry_client=schema_registry_client, - from_dict=TrackingEvent.from_dict) - - @staticmethod - def handleMessage(msg): - logger.info(f"Received message with key {msg.key()} and value {TrackingEvent.to_dict(msg.value())}") - - -## TODO (EventBus): Find a better place to have the list of topics -def getHandler(topic_name): - TOPIC_HANDLERS = { - settings.LICENSE_EVENT_TOPIC_NAME: LicenseMessageHandler - } - return TOPIC_HANDLERS[topic_name] diff --git a/openedx/core/djangoapps/kafka_consumer/docs/0001_kafka_consumer_command.rst b/openedx/core/djangoapps/kafka_consumer/docs/0001_kafka_consumer_command.rst deleted file mode 100644 index a9dfa7995363..000000000000 --- a/openedx/core/djangoapps/kafka_consumer/docs/0001_kafka_consumer_command.rst +++ /dev/null @@ -1,30 +0,0 @@ -Managing Kafka Consumers --------------- - -Status -====== - -In Progress - -Context -======= -As outlined in the upcoming OEP-52, edX.org has elected to go with Apache Kafka as our event bus implementation. Though the decision presented here is predicated on this particular edX.org decision, it is included to help other Open edX users evaluate Kafka for their own purposes. The standard pattern for consuming events with Kafka is to poll in a loop and process messages as they come in. According to the Confluent team it is a best practice to limit each consumer to a single topic (Confluent is a platform for industry-scale Kafka management):: - - consumer.subscribe(["topic"]) - while True: - message = consumer.poll() - ## process message - -This ``while True`` loop means whatever is running this consumer will run infinitely and block whatever thread runs it from doing anything else. Thus, this code cannot be run as part of the regular Django web server. It also would not fit neatly onto a celery task, which would put it in direct competition for workers with all other celery tasks and be difficult to scale as the number of topics increases. - -Decision -======== -edX.org will use Kubernetes to manage containers whose sole purpose is to run a management command, which in turn will run a polling loop against the specified topic. This will enable standard horizontal scaling of Kafka consumer groups. - -Rejected Alternatives -===================== - -#. Create a new ASG of EC2 instances dedicated to running a consumer management command, similar to how we create instances dedicated to running celery workers - * edX and the industry in general we are moving away from the ASG pattern and on to Kubernetes. Both the ASG approach and the Kubernetes approach would require a substantial amount of work in order to make the number of instances scalable based on number of topics rather than built-in measurements like CPU load. Based on this, it makes more sense to put in the effort in Kubernetes rather than creating more outdated infrastructure. -#. Django-channels - * Research turned up the possibility of using django-channels (websocket equivalent for Django) for use with Kafka, but the design and potential benefit was unclear so this was not pursued further diff --git a/openedx/core/djangoapps/kafka_consumer/management/__init__.py b/openedx/core/djangoapps/kafka_consumer/management/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/openedx/core/djangoapps/kafka_consumer/management/commands/__init__.py b/openedx/core/djangoapps/kafka_consumer/management/commands/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/openedx/core/djangoapps/kafka_consumer/management/commands/consume_events.py b/openedx/core/djangoapps/kafka_consumer/management/commands/consume_events.py deleted file mode 100644 index 4e26f164209a..000000000000 --- a/openedx/core/djangoapps/kafka_consumer/management/commands/consume_events.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -Management command for listening to license-manager events and logging them -""" - -import logging - -from confluent_kafka import DeserializingConsumer, KafkaError -from confluent_kafka.schema_registry import SchemaRegistryClient -from confluent_kafka.serialization import StringDeserializer -from django.conf import settings -from django.core.management.base import BaseCommand -from edx_toggles.toggles import SettingToggle - -from openedx.core.djangoapps.kafka_consumer.consumers import getHandler - -logger = logging.getLogger(__name__) - -# .. toggle_name: KAFKA_CONSUMERS_ENABLED -# .. toggle_implementation: SettingToggle -# .. toggle_default: False -# .. toggle_description: Enables the ability to listen and process events from the Kafka event bus -# .. toggle_use_cases: opt_in -# .. toggle_creation_date: 2022-01-31 -# .. toggle_tickets: https://openedx.atlassian.net/browse/ARCHBOM-1992 -KAFKA_CONSUMERS_ENABLED = SettingToggle('KAFKA_CONSUMERS_ENABLED', default=False) - -CONSUMER_POLL_TIMEOUT = getattr(settings, 'CONSUMER_POLL_TIMEOUT', 1.0) - - -class Command(BaseCommand): - """ - Listen for events from the event bus and log them. Only run on servers where KAFKA_CONSUMERS_ENABLED is true - """ - help = """ - This starts a Kafka event consumer that listens to the specified topic and logs all messages it receives. Topic - is required. - - example: - manage.py ... consume_events -t license-event-prod -g license-event-consumers - - # TODO (EventBus): Add pointer to relevant future docs around topics and consumer groups, and potentially - update example topic and group names to follow any future naming conventions. - - """ - - def add_arguments(self, parser): - - parser.add_argument( - '-t', '--topic', - nargs=1, - required=True, - help='Topic to consume' - ) - - parser.add_argument( - '-g', '--group_id', - nargs=1, - required=True, - help='Consumer group id' - ) - - def handle(self, *args, **options): - if not KAFKA_CONSUMERS_ENABLED.is_enabled(): - logger.error("Kafka consumers not enabled") - return - try: - KAFKA_SCHEMA_REGISTRY_CONFIG = { - 'url': settings.SCHEMA_REGISTRY_URL, - 'basic.auth.user.info': f"{settings.SCHEMA_REGISTRY_API_KEY}:{settings.SCHEMA_REGISTRY_API_SECRET}", - } - - schema_registry_client = SchemaRegistryClient(KAFKA_SCHEMA_REGISTRY_CONFIG) - - topic = options['topic'][0] - - HandlerClass = getHandler(topic) - - # TODO (EventBus): - # 1. generalize configurations to allow connection to local Kafka clusters without SSL - # 2. Reevaluate if all consumers should listen for the earliest unprocessed offset (auto.offset.reset) - - consumer = DeserializingConsumer({ - 'bootstrap.servers': settings.KAFKA_BOOTSTRAP_SERVER, - 'group.id': options["group_id"][0], - 'key.deserializer': StringDeserializer('utf-8'), - 'value.deserializer': HandlerClass.getDeserializer(schema_registry_client), - 'auto.offset.reset': 'earliest', - 'sasl.mechanism': 'PLAIN', - 'security.protocol': 'SASL_SSL', - 'sasl.username': settings.KAFKA_API_KEY, - 'sasl.password': settings.KAFKA_API_SECRET, - }) - - try: - consumer.subscribe([topic]) - - # TODO (EventBus): - # 1. Is there an elegant way to exit the loop? - # 2. Determine if there are other errors that shouldn't kill the entire loop - while True: - msg = consumer.poll(timeout=CONSUMER_POLL_TIMEOUT) - if msg is None: - continue - if msg.error(): - # TODO (EventBus): iterate on error handling with retry and dead-letter queue topics - if msg.error().code() == KafkaError._PARTITION_EOF: # pylint: disable=protected-access - # End of partition event - logger.info(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset}") - elif msg.error(): - logger.exception(msg.error()) - continue - HandlerClass.handleMessage(msg) - finally: - # Close down consumer to commit final offsets. - consumer.close() - logger.info("Committing final offsets") - except Exception: # pylint: disable=broad-except - logger.exception("Error consuming Kafka events") From e71d68bce0ceb641c41cab200ca41b4932a6cb96 Mon Sep 17 00:00:00 2001 From: Rebecca Graber Date: Fri, 25 Feb 2022 16:55:36 -0500 Subject: [PATCH 2/3] feat: rm unused libraries --- requirements/edx/base.in | 2 -- 1 file changed, 2 deletions(-) diff --git a/requirements/edx/base.in b/requirements/edx/base.in index 3ecd8759ef00..6e8b1c072370 100644 --- a/requirements/edx/base.in +++ b/requirements/edx/base.in @@ -36,7 +36,6 @@ botocore==1.8.17 # via boto3, s3transfer bridgekeeper # Used for determining permissions for courseware. celery # Asynchronous task execution library chem # A helper library for chemistry calculations -confluent_kafka contextlib2 # We need contextlib2.ExitStack so we can stop using contextlib.nested which doesn't exist in python 3 crowdsourcehinter-xblock cryptography # Implementations of assorted cryptography algorithms @@ -97,7 +96,6 @@ edx-user-state-client edx-when edxval event-tracking -fastavro fs==2.0.18 fs-s3fs==0.1.8 geoip2 # Python API for the GeoIP web services and databases From 415e3c2eeb2829bd1a50dbd839e3ebda470e5d38 Mon Sep 17 00:00:00 2001 From: Rebecca Graber Date: Fri, 4 Mar 2022 16:17:02 -0500 Subject: [PATCH 3/3] fix: reqs --- requirements/common_constraints.txt | 5 +++++ requirements/edx/base.txt | 6 +----- requirements/edx/development.txt | 2 -- requirements/edx/testing.txt | 3 --- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/requirements/common_constraints.txt b/requirements/common_constraints.txt index 72021fe71f92..2fa9fe0e3347 100644 --- a/requirements/common_constraints.txt +++ b/requirements/common_constraints.txt @@ -3,6 +3,11 @@ # See BOM-2721 for more details. # Below is the copied and edited version of common_constraints +# This is a temporary solution to override the real common_constraints.txt +# In edx-lint, until the pyjwt constraint in edx-lint has been removed. +# See BOM-2721 for more details. +# Below is the copied and edited version of common_constraints + # A central location for most common version constraints # (across edx repos) for pip-installation. # diff --git a/requirements/edx/base.txt b/requirements/edx/base.txt index b920b7fa44c8..3a95b7ba8517 100644 --- a/requirements/edx/base.txt +++ b/requirements/edx/base.txt @@ -129,8 +129,6 @@ code-annotations==1.3.0 # via # edx-enterprise # edx-toggles -confluent-kafka==1.8.2 - # via -r requirements/edx/base.in contextlib2==21.6.0 # via -r requirements/edx/base.in coreapi==2.3.3 @@ -526,9 +524,7 @@ event-tracking==2.0.0 # edx-proctoring # edx-search fastavro==1.4.9 - # via - # -r requirements/edx/base.in - # openedx-events + # via openedx-events frozenlist==1.3.0 # via # aiohttp diff --git a/requirements/edx/development.txt b/requirements/edx/development.txt index a530ffa50875..a992dd5cb1f0 100644 --- a/requirements/edx/development.txt +++ b/requirements/edx/development.txt @@ -183,8 +183,6 @@ code-annotations==1.3.0 # edx-enterprise # edx-lint # edx-toggles -confluent-kafka==1.8.2 - # via -r requirements/edx/testing.txt contextlib2==21.6.0 # via -r requirements/edx/testing.txt coreapi==2.3.3 diff --git a/requirements/edx/testing.txt b/requirements/edx/testing.txt index 36bfe2a11b96..7de816d4ea18 100644 --- a/requirements/edx/testing.txt +++ b/requirements/edx/testing.txt @@ -75,7 +75,6 @@ attrs==21.4.0 # aiohttp # edx-ace # openedx-events - # outcome # pytest babel==2.9.1 # via @@ -174,8 +173,6 @@ code-annotations==1.3.0 # edx-enterprise # edx-lint # edx-toggles -confluent-kafka==1.8.2 - # via -r requirements/edx/base.txt contextlib2==21.6.0 # via -r requirements/edx/base.txt coreapi==2.3.3