Skip to content

Commit

Permalink
feat: emit transaction lifecycle events
Browse files Browse the repository at this point in the history
Emit openedx events (to the event bus) when transactions are created, committed,
failed, and reversed.
Note this temporarily points at a public (personal) fork of openedx-events, so
that we can run CI (or maybe even deploy) without merging to the upstream repo.

ENT-8761
  • Loading branch information
iloveagent57 committed May 21, 2024
1 parent 482a343 commit 8a67960
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 20 deletions.
91 changes: 91 additions & 0 deletions enterprise_subsidy/apps/core/event_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
Functions for serializing and emiting Open edX event bus signals.
"""
from openedx_events.enterprise.data import LedgerTransaction, LedgerTransactionReversal
from openedx_events.enterprise.signals import (
LEDGER_TRANSACTION_CREATED,
LEDGER_TRANSACTION_COMMITTED,
LEDGER_TRANSACTION_FAILED,
LEDGER_TRANSACTION_REVERSED,
)


def serialize_transaction(transaction_record):
"""
Serializes the ``transaction_record``into a defined set of attributes
for use in the event-bus signal.
"""
reversal_data = None
if reversal_record := transaction_record.get_reversal():
reversal_data = LedgerTransactionReversal(
uuid=reversal_record.uuid,
created=reversal_record.created,
modified=reversal_record.modified,
idempotency_key=reversal_record.idempotency_key,
quantity=reversal_record.quantity,
state=reversal_record.state,
)
data = LedgerTransaction(
uuid=transaction_record.uuid,
created=transaction_record.created,
modified=transaction_record.modified,
idempotency_key=transaction_record.idempotency_key,
quantity=transaction_record.quantity,
state=transaction_record.state,
ledger_uuid=transaction_record.ledger.uuid,
subsidy_access_policy_uuid=transaction_record.subsidy_access_policy_uuid,
lms_user_id=transaction_record.lms_user_id,
content_key=transaction_record.content_key,
parent_content_key=transaction_record.parent_content_key,
fulfillment_identifier=transaction_record.fulfillment_identifier,
reversal=reversal_data,
)
return data


def send_transaction_created_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_CREATED open edx event for the given ``transaction_record``.
Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_CREATED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_committed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_COMMITTED open edx event for the given ``transaction_record``.
Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_COMMITTED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_failed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_FAILED open edx event for the given ``transaction_record``.
Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_FAILED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_reversed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_REVERSED open edx event for the given ``transaction_record``.
Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_REVERSED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)
7 changes: 6 additions & 1 deletion enterprise_subsidy/apps/subsidy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from enterprise_subsidy.apps.api_client.enterprise import EnterpriseApiClient
from enterprise_subsidy.apps.api_client.lms_user import LmsUserApiClient
from enterprise_subsidy.apps.content_metadata.api import ContentMetadataApi
from enterprise_subsidy.apps.core import event_bus
from enterprise_subsidy.apps.core.utils import localized_utcnow
from enterprise_subsidy.apps.fulfillment.api import GEAGFulfillmentHandler

Expand Down Expand Up @@ -370,7 +371,7 @@ def create_transaction(
openedx_ledger.api.LedgerBalanceExceeded:
Raises this if the transaction would cause the balance of the ledger to become negative.
"""
return ledger_api.create_transaction(
ledger_transaction = ledger_api.create_transaction(
ledger=self.ledger,
quantity=quantity,
idempotency_key=idempotency_key,
Expand All @@ -382,6 +383,8 @@ def create_transaction(
subsidy_access_policy_uuid=subsidy_access_policy_uuid,
**transaction_metadata,
)
event_bus.send_transaction_created_event(ledger_transaction)
return ledger_transaction

def commit_transaction(self, ledger_transaction, fulfillment_identifier=None, external_reference=None):
"""
Expand All @@ -402,6 +405,7 @@ def commit_transaction(self, ledger_transaction, fulfillment_identifier=None, ex
ledger_transaction.external_reference.set([external_reference])
ledger_transaction.state = TransactionStateChoices.COMMITTED
ledger_transaction.save()
event_bus.send_transaction_committed_event(ledger_transaction)

def rollback_transaction(self, ledger_transaction):
"""
Expand All @@ -410,6 +414,7 @@ def rollback_transaction(self, ledger_transaction):
logger.info(f'Setting transaction {ledger_transaction.uuid} state to failed.')
ledger_transaction.state = TransactionStateChoices.FAILED
ledger_transaction.save()
event_bus.send_transaction_failed_event(ledger_transaction)

def redeem(
self,
Expand Down
6 changes: 5 additions & 1 deletion enterprise_subsidy/apps/transaction/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from openedx_ledger.models import TransactionStateChoices

from enterprise_subsidy.apps.api_client.enterprise import EnterpriseApiClient
from enterprise_subsidy.apps.core.event_bus import send_transaction_reversed_event
from enterprise_subsidy.apps.fulfillment.api import GEAGFulfillmentHandler
from enterprise_subsidy.apps.transaction.utils import generate_transaction_reversal_idempotency_key

Expand Down Expand Up @@ -88,7 +89,10 @@ def reverse_transaction(transaction, unenroll_time=None):
transaction.fulfillment_identifier,
unenroll_time or timezone.now(),
)
return reverse_full_transaction(
reversal = reverse_full_transaction(
transaction=transaction,
idempotency_key=idempotency_key,
)
transaction.refresh_from_db()
send_transaction_reversed_event(transaction)
return reversal
3 changes: 3 additions & 0 deletions enterprise_subsidy/apps/transaction/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from django.dispatch import receiver
from openedx_ledger.signals.signals import TRANSACTION_REVERSED

from enterprise_subsidy.apps.core.event_bus import send_transaction_reversed_event

from ..api import cancel_transaction_external_fulfillment, cancel_transaction_fulfillment
from ..exceptions import TransactionFulfillmentCancelationException

Expand All @@ -29,6 +31,7 @@ def listen_for_transaction_reversal(sender, **kwargs):
try:
cancel_transaction_external_fulfillment(transaction)
cancel_transaction_fulfillment(transaction)
send_transaction_reversed_event(transaction)
except TransactionFulfillmentCancelationException as exc:
error_msg = f"Error canceling platform fulfillment {transaction.fulfillment_identifier}: {exc}"
logger.exception(error_msg)
Expand Down
16 changes: 13 additions & 3 deletions enterprise_subsidy/apps/transaction/tests/test_signal_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ class TransactionSignalHandlerTestCase(TestCase):
Tests for the transaction signal handlers
"""

@mock.patch('enterprise_subsidy.apps.transaction.signals.handlers.send_transaction_reversed_event')
@mock.patch('enterprise_subsidy.apps.api_client.base_oauth.OAuthAPIClient', return_value=mock.MagicMock())
def test_transaction_reversed_signal_handler_catches_event(self, mock_oauth_client):
def test_transaction_reversed_signal_handler_catches_event(self, mock_oauth_client, mock_send_event_bus_reversed):
"""
Test that the transaction reversed signal handler catches the transaction reversed event when it's emitted
"""
Expand All @@ -38,10 +39,14 @@ def test_transaction_reversed_signal_handler_catches_event(self, mock_oauth_clie
EnterpriseApiClient.enterprise_subsidy_fulfillment_endpoint +
f"{transaction.fulfillment_identifier}/cancel-fulfillment",
)
mock_send_event_bus_reversed.assert_called_once_with(transaction)

@mock.patch('enterprise_subsidy.apps.transaction.signals.handlers.send_transaction_reversed_event')
@mock.patch('enterprise_subsidy.apps.fulfillment.api.GetSmarterEnterpriseApiClient')
@mock.patch('enterprise_subsidy.apps.api_client.base_oauth.OAuthAPIClient', return_value=mock.MagicMock())
def test_reversed_signal_causes_internal_and_external_unfulfillment(self, mock_oauth_client, mock_geag_client):
def test_reversed_signal_causes_internal_and_external_unfulfillment(
self, mock_oauth_client, mock_geag_client, mock_send_event_bus_reversed
):
"""
Tests that the signal handler cancels internal and external fulfillments
related to the reversed transaction.
Expand All @@ -67,9 +72,13 @@ def test_reversed_signal_causes_internal_and_external_unfulfillment(self, mock_o
mock_geag_client().cancel_enterprise_allocation.assert_called_once_with(
geag_reference.external_reference_id,
)
mock_send_event_bus_reversed.assert_called_once_with(transaction)

@mock.patch('enterprise_subsidy.apps.transaction.signals.handlers.send_transaction_reversed_event')
@mock.patch('enterprise_subsidy.apps.api_client.base_oauth.OAuthAPIClient', return_value=mock.MagicMock())
def test_transaction_reversed_signal_without_fulfillment_identifier(self, mock_oauth_client):
def test_transaction_reversed_signal_without_fulfillment_identifier(
self, mock_oauth_client, mock_send_event_bus_reversed
):
"""
Test that the transaction reversed signal handler does not call the api client if the transaction has no
fulfillment identifier
Expand All @@ -82,3 +91,4 @@ def test_transaction_reversed_signal_without_fulfillment_identifier(self, mock_o
TRANSACTION_REVERSED.send(sender=self, reversal=reversal)

assert mock_oauth_client.return_value.post.call_count == 0
self.assertFalse(mock_send_event_bus_reversed.called)
44 changes: 44 additions & 0 deletions enterprise_subsidy/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,47 @@ def root(*path_fragments):
# be more drift between the time of allocation and the time of redemption.
ALLOCATION_PRICE_VALIDATION_LOWER_BOUND_RATIO = .80
ALLOCATION_PRICE_VALIDATION_UPPER_BOUND_RATIO = 1.20

# Kafka and event broker settings
TRANSACTION_LIFECYCLE_TOPIC = "enterprise-subsidies-transaction-lifecycle"
TRANSACTION_CREATED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.created.v1"
TRANSACTION_COMMITTED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.committed.v1"
TRANSACTION_FAILED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.failed.v1"
TRANSACTION_REVERSED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.reversed.v1"

# .. setting_name: EVENT_BUS_PRODUCER_CONFIG
# .. setting_default: all events disabled
# .. setting_description: Dictionary of event_types mapped to dictionaries of topic to topic-related configuration.
# Each topic configuration dictionary contains
# * `enabled`: a toggle denoting whether the event will be published to the topic. These should be annotated
# according to
# https://edx.readthedocs.io/projects/edx-toggles/en/latest/how_to/documenting_new_feature_toggles.html
# * `event_key_field` which is a period-delimited string path to event data field to use as event key.
# Note: The topic names should not include environment prefix as it will be dynamically added based on
# EVENT_BUS_TOPIC_PREFIX setting.
EVENT_BUS_PRODUCER_CONFIG = {
TRANSACTION_CREATED_EVENT_NAME: {
TRANSACTION_LIFECYCLE_TOPIC: {
'event_key_field': 'ledger_transaction.uuid',
'enabled': False,
},
},
TRANSACTION_COMMITTED_EVENT_NAME: {
TRANSACTION_LIFECYCLE_TOPIC: {
'event_key_field': 'ledger_transaction.uuid',
'enabled': False,
},
},
TRANSACTION_FAILED_EVENT_NAME: {
TRANSACTION_LIFECYCLE_TOPIC: {
'event_key_field': 'ledger_transaction.uuid',
'enabled': False,
},
},
TRANSACTION_REVERSED_EVENT_NAME: {
TRANSACTION_LIFECYCLE_TOPIC: {
'event_key_field': 'ledger_transaction.uuid',
'enabled': False,
},
},
}
12 changes: 11 additions & 1 deletion enterprise_subsidy/settings/devstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@
EVENT_BUS_CONSUMER = 'edx_event_bus_kafka.KafkaEventConsumer'
EVENT_BUS_TOPIC_PREFIX = 'dev'

# Application settings go here...
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_CREATED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_COMMITTED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_FAILED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_REVERSED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True

# Private settings
# The local.py settings file also does this, but then this current file (devstack.py)
# imports *from* local.py, so anything earlier in this file overrides what's in private.py
# We want private.py to have the highest precedence, so re-import private settings again here.
if os.path.isfile(join(dirname(abspath(__file__)), 'private.py')):
from .private import * # pylint: disable=import-error
13 changes: 13 additions & 0 deletions enterprise_subsidy/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,16 @@
GET_SMARTER_OAUTH2_KEY = 'get-smarter-key'
GET_SMARTER_OAUTH2_SECRET = 'get-smarter-secret'
GET_SMARTER_OAUTH2_PROVIDER_URL = 'https://get-smarter.provider.url'

# Kafka Settings
# We set to fake server addresses because we shouldn't actually be emitting real events during tests
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL = 'http://test.schema-registry:8000'
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS = 'test.kafka:8001'
EVENT_BUS_PRODUCER = 'edx_event_bus_kafka.create_producer'
EVENT_BUS_CONSUMER = 'edx_event_bus_kafka.KafkaEventConsumer'
EVENT_BUS_TOPIC_PREFIX = 'dev-test'

EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_CREATED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_COMMITTED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_FAILED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_REVERSED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
5 changes: 4 additions & 1 deletion requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ edx-rbac
edx-rest-api-client
jsonfield2
mysqlclient
openedx-events
# openedx-events
openedx-ledger
pymemcache
pytz
rules
getsmarter-api-clients
django-log-request-id
django-clearcache

# Temporary pin to a fork to use new enterprise events
git+https://github.com/iloveagent57/openedx-events.git@67d7bc392af88996ca1e63d789c6017248dac305
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ oauthlib==3.2.2
# getsmarter-api-clients
# requests-oauthlib
# social-auth-core
openedx-events==9.10.0
openedx-events @ git+https://github.com/iloveagent57/openedx-events.git@67d7bc392af88996ca1e63d789c6017248dac305
# via
# -r requirements/base.in
# edx-event-bus-kafka
Expand Down
4 changes: 2 additions & 2 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ oauthlib==3.2.2
# getsmarter-api-clients
# requests-oauthlib
# social-auth-core
openedx-events==9.10.0
openedx-events @ git+https://github.com/iloveagent57/openedx-events.git@67d7bc392af88996ca1e63d789c6017248dac305
# via
# -r requirements/validation.txt
# edx-event-bus-kafka
Expand Down Expand Up @@ -473,7 +473,7 @@ pyjwt[crypto]==2.8.0
# edx-drf-extensions
# edx-rest-api-client
# social-auth-core
pylint==3.2.1
pylint==3.2.2
# via
# -r requirements/validation.txt
# edx-lint
Expand Down
4 changes: 2 additions & 2 deletions requirements/doc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ oauthlib==3.2.2
# getsmarter-api-clients
# requests-oauthlib
# social-auth-core
openedx-events==9.10.0
openedx-events @ git+https://github.com/iloveagent57/openedx-events.git@67d7bc392af88996ca1e63d789c6017248dac305
# via
# -r requirements/test.txt
# edx-event-bus-kafka
Expand Down Expand Up @@ -446,7 +446,7 @@ pyjwt[crypto]==2.8.0
# edx-drf-extensions
# edx-rest-api-client
# social-auth-core
pylint==3.2.1
pylint==3.2.2
# via
# -r requirements/test.txt
# edx-lint
Expand Down
2 changes: 1 addition & 1 deletion requirements/pip.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ wheel==0.43.0
# The following packages are considered to be unsafe in a requirements file:
pip==24.0
# via -r requirements/pip.in
setuptools==69.5.1
setuptools==70.0.0
# via -r requirements/pip.in
2 changes: 1 addition & 1 deletion requirements/production.txt
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ oauthlib==3.2.2
# getsmarter-api-clients
# requests-oauthlib
# social-auth-core
openedx-events==9.10.0
openedx-events @ git+https://github.com/iloveagent57/openedx-events.git@67d7bc392af88996ca1e63d789c6017248dac305
# via
# -r requirements/base.txt
# edx-event-bus-kafka
Expand Down
4 changes: 2 additions & 2 deletions requirements/quality.txt
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ oauthlib==3.2.2
# getsmarter-api-clients
# requests-oauthlib
# social-auth-core
openedx-events==9.10.0
openedx-events @ git+https://github.com/iloveagent57/openedx-events.git@67d7bc392af88996ca1e63d789c6017248dac305
# via
# -r requirements/test.txt
# edx-event-bus-kafka
Expand Down Expand Up @@ -420,7 +420,7 @@ pyjwt[crypto]==2.8.0
# edx-drf-extensions
# edx-rest-api-client
# social-auth-core
pylint==3.2.1
pylint==3.2.2
# via
# -r requirements/test.txt
# edx-lint
Expand Down
Loading

0 comments on commit 8a67960

Please sign in to comment.