From 46ba27af962362f3bf5c6e6ced1d8492ccdb73e3 Mon Sep 17 00:00:00 2001 From: Brian Mesick <112640379+bmtcril@users.noreply.github.com> Date: Tue, 20 Jun 2023 13:05:08 -0400 Subject: [PATCH] feat: Management command to batch load tracking logs (#301) * feat: Add command to bulk transform event log files This management command can pull from local storage or many types of cloud storage to transform tracking logs into xAPI or Caliper, and save them to local, cloud, or LRS. * feat: Make logging of transformed statements toggleable Added new feature toggles for logging xapi and caliper statements to help performance when they're not needed. --- docs/getting_started.rst | 4 + docs/howto/how_to_bulk_transform.rst | 121 ++++++ .../backends/events_router.py | 180 +++++--- .../backends/tests/test_events_router.py | 384 ++++++++++++++++- event_routing_backends/helpers.py | 7 +- event_routing_backends/management/__init__.py | 0 .../management/commands/__init__.py | 0 .../management/commands/helpers/__init__.py | 0 .../commands/helpers/event_log_parser.py | 56 +++ .../commands/helpers/queued_sender.py | 169 ++++++++ .../commands/tests/fixtures/tracking.log | 12 + .../tests/test_transform_tracking_logs.py | 396 ++++++++++++++++++ .../commands/transform_tracking_logs.py | 272 ++++++++++++ event_routing_backends/models.py | 2 +- .../processors/caliper/__init__.py | 14 + .../processors/caliper/tests/test_caliper.py | 51 ++- .../caliper/transformer_processor.py | 14 +- .../processors/mixins/base_transformer.py | 7 +- .../processors/xapi/__init__.py | 13 + .../event_transformers/navigation_events.py | 2 +- .../problem_interaction_events.py | 25 +- .../xapi/event_transformers/video_events.py | 11 +- .../edx.grades.problem.submitted.json | 2 +- .../processors/xapi/tests/test_xapi.py | 35 +- .../processors/xapi/transformer.py | 3 +- .../processors/xapi/transformer_processor.py | 14 +- event_routing_backends/settings/common.py | 2 + event_routing_backends/settings/production.py | 8 + event_routing_backends/tasks.py | 52 +++ event_routing_backends/tests/test_helpers.py | 7 + event_routing_backends/tests/test_settings.py | 8 + event_routing_backends/utils/http_client.py | 36 ++ .../utils/xapi_lrs_client.py | 24 ++ requirements/base.in | 4 +- requirements/base.txt | 10 +- requirements/dev.txt | 12 +- requirements/doc.txt | 14 +- requirements/quality.txt | 10 +- requirements/test.txt | 10 +- test_settings.py | 7 +- 40 files changed, 1881 insertions(+), 117 deletions(-) create mode 100644 docs/howto/how_to_bulk_transform.rst create mode 100644 event_routing_backends/management/__init__.py create mode 100644 event_routing_backends/management/commands/__init__.py create mode 100644 event_routing_backends/management/commands/helpers/__init__.py create mode 100644 event_routing_backends/management/commands/helpers/event_log_parser.py create mode 100644 event_routing_backends/management/commands/helpers/queued_sender.py create mode 100644 event_routing_backends/management/commands/tests/fixtures/tracking.log create mode 100644 event_routing_backends/management/commands/tests/test_transform_tracking_logs.py create mode 100644 event_routing_backends/management/commands/transform_tracking_logs.py diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 6649795a..2db9b72b 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -70,6 +70,10 @@ Two types of configuration are needed for the plugin: By default, both ``xapi`` and ``caliper`` backends are already configured along with filters that allow all the supported events. ``caliper`` backend is disabled by default and can be enabled by setting ``CALIPER_EVENTS_ENABLED`` to ``True`` in plugin settings. +Additionally separate log streams for xAPI and Caliper are generated as events are transformed and can be configured to be saved or ignored. These can be configured as described in the `Django docs `_ for the ``xapi_tracking`` and ``caliper_tracking`` loggers. + + + Router configuration -------------------- diff --git a/docs/howto/how_to_bulk_transform.rst b/docs/howto/how_to_bulk_transform.rst new file mode 100644 index 00000000..bc0cdaf6 --- /dev/null +++ b/docs/howto/how_to_bulk_transform.rst @@ -0,0 +1,121 @@ +How To Bulk Transform Tracking Logs +=================================== + +This is a rough guide of how to transform existing tracking log files into the formats supported by event-routing-backends using the ``transform_tracking_logs`` Django management command inside a running LMS installation. Because the transformations perform database access, looking up user, course, and block data, you will need to run this command on the same install of Open edX that created the tracking log files. + +.. warning:: This also means that doing large amounts of transformations can cause performance issues on the LMS and downstream learning record stores. Make sure to use the ``--batch_size`` and ``--sleep_between_batches_secs`` options to balance system performance vs load time. + +Sources and Destinations +------------------------ + +For most sources and destinations we use `Apache Libcloud Object storage `__ . This should cover casees from local storage to Amazon S3, MinIO, and many other cloud storage solutions. The ``--source_provider`` and ``--destination_provider`` options are the Libcloud Provider names, in all caps (ex: ``S3``, ``LOCAL``, ``MINIO``). The ``--source_config`` and ``--destination_config`` options are JSON strings passed directly to the Libcloud constructor as keyword args. + +The ``LRS`` destination provider is a special case that uses the usual event-routing-backends logic for sending events to Caliper and/or xAPI learning record stores. + +For the ``LOCAL`` provider, the path to the file(s) is a concatenation of the ``key``, which is the path to a top level directory, a ``container`` which is a single subdirectory name inside the ``key`` directory, and a ``prefix`` (if provided) will be appended to the container to determine the final path. + +:: + + # This will attempt to recursively read all files in the "/openedx/data/logs/" directory + {"key": "/openedx/data", "container": "logs"} + + # This will attempt to recursively read all files in "/openedx/data/logs/tracking/" as + # well as any files in "/openedx/data/logs/" that begin with "tracking" + {"key": "/openedx/data", "container": "logs", "prefix": "tracking"} + + # This will attempt to read a single file named "/openedx/data/logs/tracking.log" + {"key": "/openedx/data", "container": "logs", "prefix": "tracking.log"} + + +For other providers ``key`` and ``secret`` are authentication credentials and ``container`` is roughly synonymous with an S3 bucket. Configuration for each provider is different, please consult the libcloud docs for your provider to learn about other options you may need to pass in to the ``--source_config`` and ``--destination_config`` JSON structures. + + +Modes Of Operation +------------------ + +The command can work in a few distinct ways. + +**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues. + +**File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied. + +Additionally all generated statements are written to a Python logger which can be configured to be ignored, save to a file, write standard out, or a log forwarder like `Vector `__ for more statement handling options. The two loggers are named ``xapi_tracking`` and ``caliper_tracking``, and are always running. + +**File(s) to logger** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with loggers mentioned above, you can use Python log forwarding without the additional overhead of storing full files. + +.. warning:: + Events may be filtered differently in this command than in normal operation. Normally events pass through two layers of filters as described in `getting started `_. + + First are the eventtracking AsyncRoutingBackend can have processor filters, which will be ignored when running this script (since these events have already passed through the eventtracking process). + + Second are the router configuration filters which work on a per-LRS basis. These are respected when the destination is LRS, but ignored when writing to a file and the loggers. + + +Examples +-------- + +**Files to LRS** + +:: + + # Transform all events in the local file /openedx/data/tracking.log to all configured LRSs + python manage.py lms transform_tracking_logs \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \ + --destination_provider LRS \ + --transformer_type xapi + +:: + + # Transform all events in the local file /openedx/data/tracking.log to all configured LRSs + # using a smaller batch size and long sleep for LMS performance + python manage.py lms transform_tracking_logs \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \ + --destination_provider LRS \ + --transformer_type caliper \ + --batch_size 1000 \ + --sleep_between_batches_secs 2.5 + +:: + + # Recursively transform any files whose names start with "tracking" from a "logs" directory in the + # MINIO bucket "logs" to all configured LRSs + python manage.py lms transform_tracking_logs \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data", "container": "logs", "prefix":"tracking"}' \ + --destination_provider MINIO --destination_config '{"key": "openedx", "secret": "minio secret key", "container": "openedx", "prefix": "transformed_logs/2023-06-01/", "host": "files.local.overhang.io", "secure": false}' \ + --transformer_type xapi + + python manage.py lms transform_tracking_logs \ + --transformer_type xapi + --source_provider S3 \ + --source_config '{"key": "AWS key", "secret": "AWS secret", "container": "logs", "prefix":"tracking"}' \ + --destination_provider LRS + +**Files to Files** + +:: + + # Transform the entire local file /openedx/data/tracking.log to a new file in the local directory + # /openedx/data/logs/transformed_events/ the file will be named with the current timestamp. + # Note: The "container" directory must exist! + python manage.py lms transform_tracking_logs \ + --transformer_type caliper \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data/", "container": "logs", "prefix": "tracking.log"}' \ + --destination_provider LOCAL \ + --destination_config '{"key": "/openedx/data/", "container": "transformed_logs", "prefix": "2023-06-01"}' + +:: + + # Recursively transform any files whose names start with "tracking" from a "tracking_logs" directory in the + # MinIO bucket "openedx" to a single file in a MinIO storage's "transformed_logs" bucket. + # ie: http://files.local.overhang.io/openedx/tracking_logs/tracking* to http://files.local.overhang.io/openedx/transformed_logs/2023-06-02/23-06-02_20-29-20_xapi.log + # This is the configuration for a tutor local environment with MinIO enabled. + python manage.py lms transform_tracking_logs \ + --source_provider MINIO \ + --source_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "/tracking_logs", "host": "files.local.overhang.io", "secure": false}' \ + --destination_provider MINIO \ + --destination_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "transformed_logs/2023-06-02/", "host": "files.local.overhang.io", "secure": false}' --transformer_type xapi + diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index f63277ca..1e12a33b 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -7,7 +7,7 @@ from event_routing_backends.helpers import get_business_critical_events from event_routing_backends.models import RouterConfiguration -from event_routing_backends.tasks import dispatch_event, dispatch_event_persistent +from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent logger = logging.getLogger(__name__) @@ -28,7 +28,95 @@ def __init__(self, processors=None, backend_name=None): self.processors = processors if processors else [] self.backend_name = backend_name - def send(self, event): + def configure_host(self, host, router): + """ + Create host_configurations for the given host and router. + """ + host['host_configurations'] = {} + host['host_configurations'].update({'url': router.route_url}) + host['host_configurations'].update({'auth_scheme': router.auth_scheme}) + + if router.auth_scheme == RouterConfiguration.AUTH_BASIC: + host['host_configurations'].update({'username': router.username}) + host['host_configurations'].update({'password': router.password}) + elif router.auth_scheme == RouterConfiguration.AUTH_BEARER: + host['host_configurations'].update({'auth_key': router.auth_key}) + + if router.backend_name == RouterConfiguration.CALIPER_BACKEND: + host.update({'router_type': 'AUTH_HEADERS'}) + if 'headers' in host: + host['host_configurations'].update({'headers': host['headers']}) + elif router.backend_name == RouterConfiguration.XAPI_BACKEND: + host.update({'router_type': 'XAPI_LRS'}) + else: + host.update({'router_type': 'INVALID_TYPE'}) + + return host + + def prepare_to_send(self, events): + """ + Prepare a list of events to be sent and create a processed, filtered batch for each router. + """ + routers = RouterConfiguration.get_enabled_routers(self.backend_name) + business_critical_events = get_business_critical_events() + route_events = {} + + # We continue even without routers here to allow logging of statements to happen. + # If operators do not wish to log and have no enabled routers they should set XAPI_EVENTS_ENABLED + # or CALIPER_EVENTS_ENABLED to false. + if not routers: + logger.debug('Could not find any enabled router configuration for backend %s', self.backend_name) + routers = [] + + for event in events: + try: + event_name = event['name'] + except TypeError as exc: + raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc + + try: + logger.debug( + 'Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name) + ) + + processed_event = self.process_event(event) + except (EventEmissionExit, ValueError): + logger.error( + 'Could not process edx event "%s" for backend %s\'s router', + event_name, + self.backend_name, + exc_info=True + ) + continue + + logger.debug( + 'Successfully processed edx event "%s" for router with backend %s. Processed event: %s', + event_name, + self.backend_name, + processed_event + ) + + for router in routers: + host = router.get_allowed_host(event) + router_pk = router.pk + + if not host: + logger.info( + 'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"', + event_name, router_pk, self.backend_name + ) + else: + host = self.configure_host(host, router) + updated_event = self.overwrite_event_data(processed_event, host, event_name) + is_business_critical = event_name in business_critical_events + if router_pk not in route_events: + route_events[router_pk] = [(event_name, updated_event, host, is_business_critical),] + else: + route_events[router_pk].append((event_name, updated_event, host, is_business_critical)) + + return route_events + + def bulk_send(self, events): """ Send the event to configured routers after processing it. @@ -37,71 +125,39 @@ def send(self, event): the list of hosts to which the event is required to be delivered to. Arguments: - event (dict): original event dictionary + events (list[dict]): list of original event dictionaries """ - routers = RouterConfiguration.get_enabled_routers(self.backend_name) - - if not routers: - logger.info('Could not find any enabled router configuration for backend %s', self.backend_name) - return + event_routes = self.prepare_to_send(events) + + for events_for_route in event_routes.values(): + prepared_events = [] + host = None + for _, updated_event, host, _ in events_for_route: + prepared_events.append(updated_event) + + if prepared_events: # pragma: no cover + dispatch_bulk_events.delay( + prepared_events, + host['router_type'], + host['host_configurations'] + ) - try: - event_name = event['name'] - except TypeError as exc: - raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc - try: - logger.debug('Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name)) + def send(self, event): + """ + Send the event to configured routers after processing it. - processed_event = self.process_event(event) + Event is processed through the configured processors. A router config + object matching the backend_name and other match params is used to get + the list of hosts to which the event is required to be delivered to. - except EventEmissionExit: - logger.error( - 'Could not process edx event "%s" for backend %s\'s router', - event_name, - self.backend_name, - exc_info=True - ) - return - - logger.debug( - 'Successfully processed edx event "%s" for router with backend %s. Processed event: %s', - event_name, - self.backend_name, - processed_event - ) - - for router in routers: - host = router.get_allowed_host(event) - - router_url = router.route_url - if not host: - logger.info( - 'Event %s is not allowed to be sent to any host for router %s with backend "%s"', - event_name, router_url, self.backend_name - ) - else: - updated_event = self.overwrite_event_data(processed_event, host, event_name) - host['host_configurations'] = {} - host['host_configurations'].update({'url': router_url}) - host['host_configurations'].update({'auth_scheme': router.auth_scheme}) - - if router.auth_scheme == RouterConfiguration.AUTH_BASIC: - host['host_configurations'].update({'username': router.username}) - host['host_configurations'].update({'password': router.password}) - elif router.auth_scheme == RouterConfiguration.AUTH_BEARER: - host['host_configurations'].update({'auth_key': router.auth_key}) - - if router.backend_name == RouterConfiguration.CALIPER_BACKEND: - host.update({'router_type': 'AUTH_HEADERS'}) - if 'headers' in host: - host['host_configurations'].update({'headers': host['headers']}) - elif router.backend_name == RouterConfiguration.XAPI_BACKEND: - host.update({'router_type': 'XAPI_LRS'}) - else: - host.update({'router_type': 'INVALID_TYPE'}) + Arguments: + event (dict): the original event dictionary + """ + event_routes = self.prepare_to_send([event]) - business_critical_events = get_business_critical_events() - if event_name in business_critical_events: + for events_for_route in event_routes.values(): + for event_name, updated_event, host, is_business_critical in events_for_route: + if is_business_critical: dispatch_event_persistent.delay( event_name, updated_event, diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index d7f61504..b4d8bfa6 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -80,6 +80,72 @@ def setUp(self): }, } + self.bulk_sample_events = [ + { + 'name': str(sentinel.name), + 'event_type': 'edx.test.event', + 'time': '2020-01-01T12:12:12.000000+00:00', + 'data': { + 'key': 'value' + }, + 'context': { + 'username': 'testuser' + }, + 'session': '0000' + }, + { + 'name': str(sentinel.name), + 'event_type': 'edx.test.event', + 'time': '2020-01-01T12:12:12.000000+00:01', + 'data': { + 'key': 'value 1' + }, + 'context': { + 'username': 'testuser1' + }, + 'session': '0001' + }, + { + 'name': str(sentinel.name), + 'event_type': 'edx.test.event', + 'time': '2020-01-01T12:12:12.000000+00:02', + 'data': { + 'key': 'value 2' + }, + 'context': { + 'username': 'testuser2' + }, + 'session': '0002' + } + ] + + self.bulk_transformed_events = [ + { + 'name': str(sentinel.name), + 'transformed': True, + 'event_time': '2020-01-01T12:12:12.000000+00:00', + 'data': { + 'key': 'value' + }, + }, + { + 'name': str(sentinel.name), + 'transformed': True, + 'event_time': '2020-01-01T12:12:12.000000+00:01', + 'data': { + 'key': 'value 1' + }, + }, + { + 'name': str(sentinel.name), + 'transformed': True, + 'event_time': '2020-01-01T12:12:12.000000+00:02', + 'data': { + 'key': 'value 2' + }, + } + ] + self.router = EventsRouter(processors=[], backend_name='test') @patch('event_routing_backends.utils.http_client.requests.post') @@ -146,7 +212,7 @@ def test_with_no_router_configurations_available(self, mocked_logger, mocked_pos self.assertIn( call('Could not find any enabled router configuration for backend %s', 'test'), - mocked_logger.info.mock_calls + mocked_logger.debug.mock_calls ) @patch('event_routing_backends.utils.http_client.requests.post') @@ -168,6 +234,25 @@ def test_with_unsupported_routing_strategy(self, mocked_logger, mocked_post): mocked_logger.error.assert_called_once_with('Unsupported routing strategy detected: INVALID_TYPE') mocked_post.assert_not_called() + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + def test_bulk_with_unsupported_routing_strategy(self, mocked_logger, mocked_post): + RouterConfigurationFactory.create( + backend_name='test_backend', + enabled=True, + route_url='http://test3.com', + auth_scheme=RouterConfiguration.AUTH_BEARER, + auth_key='test_key', + configurations=ROUTER_CONFIG_FIXTURE[0] + ) + + router = EventsRouter(processors=[], backend_name='test_backend') + TieredCache.dangerous_clear_all_tiers() + router.bulk_send([self.transformed_event]) + + mocked_logger.error.assert_called_once_with('Unsupported routing strategy detected: INVALID_TYPE') + mocked_post.assert_not_called() + @patch('event_routing_backends.utils.http_client.requests.post') @patch('event_routing_backends.backends.events_router.logger') def test_with_no_available_hosts(self, mocked_logger, mocked_post): @@ -186,8 +271,8 @@ def test_with_no_available_hosts(self, mocked_logger, mocked_post): self.assertIn( call( - 'Event %s is not allowed to be sent to any host for router %s with backend "%s"', - self.transformed_event['name'], router_config.route_url, 'test_backend' + 'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"', + self.transformed_event['name'], router_config.pk, 'test_backend' ), mocked_logger.info.mock_calls ) @@ -223,6 +308,166 @@ def test_generic_exception(self, backend_name, mocked_logger, mocked_post): else: mocked_logger.exception.assert_not_called() + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_failed_bulk_post(self, mocked_logger, mocked_post): + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.request.method = "POST" + mock_response.text = "Fake Server Error" + + mocked_post.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.CALIPER_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) + router.bulk_send([self.transformed_event]) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + self.assertEqual(mocked_post.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_failed_post(self, mocked_logger, mocked_post): + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.request.method = "POST" + mock_response.text = "Fake Server Error" + + mocked_post.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.CALIPER_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) + router.send(self.transformed_event) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + self.assertEqual(mocked_post.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_failed_bulk_routing(self, mocked_logger, mocked_remote_lrs): + mock_response = MagicMock() + mock_response.success = False + mock_response.data = "Fake response data" + mock_response.response.code = 500 + mock_response.request.method = "POST" + mock_response.request.content = "Fake request content" + + mocked_remote_lrs.return_value.save_statements.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router.bulk_send([self.transformed_event]) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + self.assertEqual(mocked_remote_lrs.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_failed_routing(self, mocked_logger, mocked_remote_lrs): + mock_response = MagicMock() + mock_response.success = False + mock_response.data = "Fake response data" + mock_response.response.code = 500 + mock_response.request.method = "POST" + mock_response.request.content = "Fake request content" + + mocked_remote_lrs.return_value.save_statement.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router.send(self.transformed_event) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + self.assertEqual(mocked_remote_lrs.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_duplicate_ids_in_bulk(self, mocked_logger, mocked_remote_lrs): + mock_response = MagicMock() + mock_response.success = False + mock_response.data = "Fake response data" + mock_response.response.code = 409 + mock_response.request.method = "POST" + mock_response.request.content = "Fake request content" + + mocked_remote_lrs.return_value.save_statements.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router.bulk_send([self.transformed_event]) + + self.assertEqual(mocked_logger.exception.call_count, 0) + self.assertEqual(mocked_remote_lrs.call_count, 1) + + @ddt.data( + ( + RouterConfiguration.XAPI_BACKEND, + ), + ( + RouterConfiguration.CALIPER_BACKEND, + ) + ) + @patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', { + 'AUTH_HEADERS': MagicMock(side_effect=EventNotDispatched) + }) + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_bulk_generic_exception(self, backend_name, mocked_logger, mocked_post): + RouterConfigurationFactory.create( + backend_name=backend_name, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=backend_name) + router.bulk_send([self.transformed_event]) + if backend_name == RouterConfiguration.CALIPER_BACKEND: + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + mocked_post.assert_not_called() + else: + mocked_logger.exception.assert_not_called() + def test_with_non_dict_event(self): RouterConfigurationFactory.create( backend_name=RouterConfiguration.XAPI_BACKEND, @@ -406,3 +651,136 @@ def test_unsuccessful_routing_of_event_http(self): client = HttpClient(**host_configurations) with self.assertRaises(EventNotDispatched): client.send(event=self.transformed_event, event_name=self.transformed_event['name']) + + @ddt.data( + (RouterConfiguration.AUTH_BASIC, + None, + 'abc', + 'xyz', + RouterConfiguration.CALIPER_BACKEND, + 'http://test1.com' + ), + ( + RouterConfiguration.AUTH_BEARER, + 'test_key', + None, + None, + RouterConfiguration.CALIPER_BACKEND, + 'http://test2.com' + ), + ( + None, + None, + None, + None, + RouterConfiguration.CALIPER_BACKEND, + 'http://test3.com' + ), + (RouterConfiguration.AUTH_BASIC, + None, + 'abc', + 'xyz', + RouterConfiguration.XAPI_BACKEND, + 'http://test1.com' + ), + ( + RouterConfiguration.AUTH_BEARER, + 'test_key', + None, + None, + RouterConfiguration.XAPI_BACKEND, + 'http://test2.com' + ), + ( + None, + None, + None, + None, + RouterConfiguration.XAPI_BACKEND, + 'http://test3.com' + ), + ) + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @ddt.unpack + def test_successful_routing_of_bulk_events( + self, + auth_scheme, + auth_key, + username, + password, + backend_name, + route_url, + mocked_lrs, + mocked_post, + ): + TieredCache.dangerous_clear_all_tiers() + mocked_oauth_client = MagicMock() + mocked_api_key_client = MagicMock() + + MOCKED_MAP = { + 'AUTH_HEADERS': HttpClient, + 'OAUTH2': mocked_oauth_client, + 'API_KEY': mocked_api_key_client, + 'XAPI_LRS': LrsClient, + } + RouterConfigurationFactory.create( + backend_name=backend_name, + enabled=True, + route_url=route_url, + auth_scheme=auth_scheme, + auth_key=auth_key, + username=username, + password=password, + configurations=ROUTER_CONFIG_FIXTURE[0] + ) + + router = EventsRouter(processors=[], backend_name=backend_name) + + with patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', MOCKED_MAP): + router.bulk_send(self.bulk_transformed_events) + + overridden_events = self.bulk_transformed_events.copy() + + for event in overridden_events: + event['new_key'] = 'new_value' + + if backend_name == RouterConfiguration.XAPI_BACKEND: + # test LRS Client + mocked_lrs().save_statements.assert_has_calls([ + call(overridden_events), + ]) + else: + # test the HTTP client + if auth_scheme == RouterConfiguration.AUTH_BASIC: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + }, + auth=(username, password) + ), + ]) + elif auth_scheme == RouterConfiguration.AUTH_BEARER: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + 'Authorization': RouterConfiguration.AUTH_BEARER + ' ' + auth_key + } + ), + ]) + else: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + }, + ), + ]) + + # test mocked oauth client + mocked_oauth_client.assert_not_called() diff --git a/event_routing_backends/helpers.py b/event_routing_backends/helpers.py index bea26ac1..0ef3d92d 100644 --- a/event_routing_backends/helpers.py +++ b/event_routing_backends/helpers.py @@ -146,7 +146,12 @@ def get_course_from_id(course_id): Course """ course_key = CourseKey.from_string(course_id) - return get_course_overviews([course_key])[0] + course_overviews = get_course_overviews([course_key]) + if course_overviews: + return course_overviews[0] + return { + "display_name": "Unknown Course", + } def convert_seconds_to_iso(seconds): diff --git a/event_routing_backends/management/__init__.py b/event_routing_backends/management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/event_routing_backends/management/commands/__init__.py b/event_routing_backends/management/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/event_routing_backends/management/commands/helpers/__init__.py b/event_routing_backends/management/commands/helpers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/event_routing_backends/management/commands/helpers/event_log_parser.py b/event_routing_backends/management/commands/helpers/event_log_parser.py new file mode 100644 index 00000000..eecf9b91 --- /dev/null +++ b/event_routing_backends/management/commands/helpers/event_log_parser.py @@ -0,0 +1,56 @@ +""" +Support for reading tracking event logs. + +Taken entirely from edx-analytics-pipeline. +""" +import json +import logging +import re +from json.decoder import JSONDecodeError + +log = logging.getLogger(__name__) + +PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$') + + +def parse_json_event(line): + """ + Parse a tracking log input line as JSON to create a dict representation. + + Arguments: + * line: the eventlog text + """ + try: + json_match = PATTERN_JSON.match(line) + parsed = json.loads(json_match.group(1)) + + # The representation of an event that event-routing-backends receives + # from the async sender if significantly different from the one that + # are saved to tracking log files for reasons lost to history. + # This section of code attempts to format the event line to match the + # async version. + + try: + # The async version uses "data" for what the log file calls "event". + # Sometimes "event" is a nested string of JSON that needs to be parsed. + parsed["data"] = json.loads(parsed["event"]) + except (TypeError, JSONDecodeError): + # If it's a TypeError then the "event" was not a string to be parsed, + # so probably already a dict. If it's a JSONDecodeError that means the + # "event" was a string, but not JSON. Either way we just pass the value + # back, since all of those are valid. + parsed["data"] = parsed["event"] + + # The async version of tracking logs seems to use "timestamp" for this key, + # while the log file uses "time". We normalize it here. + if "timestamp" not in parsed and "time" in parsed: + parsed["timestamp"] = parsed["time"] + + return parsed + except (AttributeError, JSONDecodeError, KeyError) as e: + log.error("EXCEPTION!!!") + log.error(type(e)) + log.error(e) + log.error(line) + + return None diff --git a/event_routing_backends/management/commands/helpers/queued_sender.py b/event_routing_backends/management/commands/helpers/queued_sender.py new file mode 100644 index 00000000..d6ddf5c0 --- /dev/null +++ b/event_routing_backends/management/commands/helpers/queued_sender.py @@ -0,0 +1,169 @@ +""" +Class to handle batching and sending bulk transformed statements. +""" +import datetime +import json +import os +from io import BytesIO +from time import sleep + +from event_routing_backends.backends.events_router import EventsRouter +from event_routing_backends.management.commands.helpers.event_log_parser import parse_json_event +from event_routing_backends.models import RouterConfiguration +from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor +from event_routing_backends.processors.xapi.transformer_processor import XApiProcessor + + +class QueuedSender: + """ + Handles queuing and sending events to the destination. + """ + def __init__( + self, + destination, + destination_container, + destination_prefix, + transformer_type, + max_queue_size=10000, + sleep_between_batches_secs=1.0, + dry_run=False + ): + self.destination = destination + self.destination_container = destination_container + self.destination_prefix = destination_prefix + self.transformer_type = transformer_type + self.event_queue = [] + self.max_queue_size = max_queue_size + self.sleep_between_batches = sleep_between_batches_secs + self.dry_run = dry_run + + # Bookkeeping + self.queued_lines = 0 + self.skipped_lines = 0 + self.unparsable_lines = 0 + self.batches_sent = 0 + + if self.transformer_type == "xapi": + self.router = EventsRouter( + backend_name=RouterConfiguration.XAPI_BACKEND, + processors=[XApiProcessor()] + ) + else: + self.router = EventsRouter( + backend_name=RouterConfiguration.CALIPER_BACKEND, + processors=[CaliperProcessor()] + ) + + def is_known_event(self, event): + """ + Check whether any processor cares about this event. + """ + if "name" in event: + for processor in self.router.processors: + if event["name"] in processor.registry.mapping: + return True + return False + + def transform_and_queue(self, line): + """ + Queue the JSON representation of this log line, if valid and known to any processor. + """ + event = parse_json_event(line) + + if not event: + self.unparsable_lines += 1 + return + + if not self.is_known_event(event): + self.skipped_lines += 1 + return + + self.queue(event) + self.queued_lines += 1 + + def queue(self, event): + """ + Add an event to the queue, try to send if we've reached our batch size. + """ + self.event_queue.append(event) + if len(self.event_queue) == self.max_queue_size: + if self.dry_run: + print("Dry run, skipping, but still clearing the queue.") + else: + print(f"Max queue size of {self.max_queue_size} reached, sending.") + if self.destination == "LRS": + self.send() + else: + self.store() + + self.batches_sent += 1 + self.event_queue.clear() + sleep(self.sleep_between_batches) + + def send(self): + """ + Send to the LRS if we're configured for that, otherwise a no-op. + + Events are converted to the output xAPI / Caliper format in the router. + """ + if self.destination == "LRS": + print(f"Sending {len(self.event_queue)} events to LRS...") + self.router.bulk_send(self.event_queue) + else: + print("Skipping send, we're storing with libcloud instead of an LRS.") + + def store(self): + """ + Store to a libcloud destination if we're configured for that. + + Events are converted to the output xAPI / Caliper format here before being saved. + """ + if self.destination == "LRS": + print("Store is being called on an LRS destination, skipping.") + return + + display_path = os.path.join(self.destination_container, self.destination_prefix.lstrip("/")) + print(f"Storing {len(self.event_queue)} events to libcloud destination {display_path}") + + container = self.destination.get_container(self.destination_container) + + datestr = datetime.datetime.now().strftime('%y-%m-%d_%H-%M-%S') + object_name = f"{self.destination_prefix}/{datestr}_{self.transformer_type}.log" + print(f"Writing to {self.destination_container}/{object_name}") + + out = BytesIO() + for event in self.event_queue: + transformed_event = self.router.processors[0](event) + out.write(str.encode(json.dumps(transformed_event))) + out.write(str.encode("\n")) + out.seek(0) + + self.destination.upload_object_via_stream( + out, + container, + object_name + ) + + def finalize(self): + """ + Send a last batch of events via the LRS, or store a complete set of events to a libcloud destination. + """ + print(f"Finalizing {len(self.event_queue)} events to {self.destination}") + if not self.queued_lines: + print("Nothing in the queue to store!") + elif self.dry_run: + print("Dry run, skipping final storage.") + else: + # One final send, in case there are events left in the queue + if self.destination is None or self.destination == "LRS": + print("Sending to LRS!") + self.send() + else: + print("Storing via Libcloud!") + self.store() + self.batches_sent += 1 + + print(f"Queued {self.queued_lines} log lines, " + f"could not parse {self.unparsable_lines} log lines, " + f"skipped {self.skipped_lines} log lines, " + f"sent {self.batches_sent} batches.") diff --git a/event_routing_backends/management/commands/tests/fixtures/tracking.log b/event_routing_backends/management/commands/tests/fixtures/tracking.log new file mode 100644 index 00000000..9954ea42 --- /dev/null +++ b/event_routing_backends/management/commands/tests/fixtures/tracking.log @@ -0,0 +1,12 @@ +2023-05-23 13:53:13,461 INFO 20 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.subsection.grade_calculated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "block_id": "block-v1:edX+DemoX+Demo_Course+type@sequential+block@19a30717eff543078a5d94ae9d6c18a5", "course_version": "6452821a39fb48e67c1a860c", "weighted_total_earned": 3.0, "weighted_total_possible": 3.0, "weighted_graded_earned": 3.0, "weighted_graded_possible": 3.0, "first_attempted": "2023-05-12 20:55:53.096285+00:00", "subtree_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "45acd5c9-af65-45bd-82fe-313fa5bf9f79", "event_transaction_type": "edx.grades.problem.submitted", "visible_blocks_hash": "5RUTrtbgdsnoSKsFavs24Om8Yp0="}, "time": "2023-05-23T13:53:13.460372+00:00", "event_type": "edx.grades.subsection.grade_calculated", "event_source": "server", "page": null} +2023-05-23 13:53:13,477 INFO 20 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.course.grade_calculated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "course_version": "6452821a39fb48e67c1a860c", "percent_grade": 0.03, "letter_grade": "", "course_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "45acd5c9-af65-45bd-82fe-313fa5bf9f79", "event_transaction_type": "edx.grades.problem.submitted", "grading_policy_hash": "2HDb6cWz6xgUQ8b9YhsgN5sukP0="}, "time": "2023-05-23T13:53:13.475692+00:00", "event_type": "edx.grades.course.grade_calculated", "event_source": "server", "page": null} +2023-05-23 13:53:13,495 INFO 20 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.course.grade.now_failed", "context": "course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "event_transaction_id": "45acd5c9-af65-45bd-82fe-313fa5bf9f79", "event_transaction_type": "edx.grades.problem.submitted"}, "time": "2023-05-23T13:53:13.486905+00:00", "event_type": "edx.course.grade.now_failed", "event_source": "server", "page": null} +2023-05-23 14:12:17,290 INFO 27 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "course_user_tags": {}, "user_id": 6, "path": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "org_id": "edX", "enterprise_uuid": ""}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": "{\"GET\": {}, \"POST\": {\"input_932e6f2ce8274072a355a94560216d1a_2_1\": [\"choice_2\"]}}", "time": "2023-05-23T14:12:17.286772+00:00", "event_type": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "event_source": "server", "page": null} +2023-05-23 14:12:17,308 INFO 7 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "problem_check", "context": {"user_id": 6, "path": "/event", "course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": "input_932e6f2ce8274072a355a94560216d1a_2_1=choice_2", "time": "2023-05-23T14:12:17.299491+00:00", "event_type": "problem_check", "event_source": "browser", "page": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view"} +2023-05-23 14:12:17,388 INFO 27 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "edx.grades.problem.submitted", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "course_user_tags": {}, "user_id": 6, "path": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "org_id": "edX", "enterprise_uuid": "", "module": {"display_name": "Perchance to Dream", "usage_key": "block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a"}}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "problem_id": "block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "edx.grades.problem.submitted", "weighted_earned": 1, "weighted_possible": 1}, "time": "2023-05-23T14:12:17.388051+00:00", "event_type": "edx.grades.problem.submitted", "event_source": "server", "page": null} +2023-05-23 14:12:17,402 INFO 27 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "problem_check", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "course_user_tags": {}, "user_id": 6, "path": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "org_id": "edX", "enterprise_uuid": "", "module": {"display_name": "Perchance to Dream", "usage_key": "block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a"}, "asides": {}}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": {"state": {"seed": 1, "student_answers": {"932e6f2ce8274072a355a94560216d1a_2_1": "choice_2"}, "has_saved_answers": false, "correct_map": {"932e6f2ce8274072a355a94560216d1a_2_1": {"correctness": "correct", "npoints": null, "msg": "", "hint": "", "hintmode": null, "queuestate": null, "answervariable": null}}, "input_state": {"932e6f2ce8274072a355a94560216d1a_2_1": {}}, "done": true}, "problem_id": "block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a", "answers": {"932e6f2ce8274072a355a94560216d1a_2_1": "choice_2"}, "grade": 1, "max_grade": 1, "correct_map": {"932e6f2ce8274072a355a94560216d1a_2_1": {"correctness": "correct", "npoints": null, "msg": "", "hint": "", "hintmode": null, "queuestate": null, "answervariable": null}}, "success": "correct", "attempts": 3, "submission": {"932e6f2ce8274072a355a94560216d1a_2_1": {"question": "", "answer": "There is an implication that the strangeness to follow can be considered like a dream.", "response_type": "multiplechoiceresponse", "input_type": "choicegroup", "correct": true, "variant": "", "group_label": ""}}}, "time": "2023-05-23T14:12:17.398107+00:00", "event_type": "problem_check", "event_source": "server", "page": "x_module"} +2023-05-23 14:12:17,457 INFO 7 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "problem_graded", "context": {"user_id": 6, "path": "/event", "course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": ["input_932e6f2ce8274072a355a94560216d1a_2_1=choice_2", "\n\n\n\n

\n Perchance to Dream\n

\n\n
\n\n
\n
\n

The paragraph contains references to sleepiness and an unexpected event. Why?

\n
\n
\n \n
\n \n
\n \n
\n \n
\n \n
\n \n
\n \n
\n
\n \n\n\n correct\n\n\n
\n
\n
\n
\n \n\n
\n \n \n \n
\n
\n \n\n
\n Some problems have options such as save, reset, hints, or show answer. These options follow the Submit button.\n
\n
\n
\n \n\n\n
\n \n \n \n
\n \n
\n
\n\n \n\n\n
\n \n Correct (1/1 point)\n \n
\n \n
\n
\n\n \n\n\n
\n \n None\n \n
\n \n
\n
\n\n \n \n\n\n
\n \n Answers are displayed within the problem\n \n
\n \n
\n
\n\n
\n\n\n"], "time": "2023-05-23T14:12:17.457001+00:00", "event_type": "problem_graded", "event_source": "browser", "page": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view"} +2023-05-23 14:12:19,621 INFO 42 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.subsection.grade_calculated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "block_id": "block-v1:edX+DemoX+Demo_Course+type@sequential+block@19a30717eff543078a5d94ae9d6c18a5", "course_version": "6452821a39fb48e67c1a860c", "weighted_total_earned": 3.0, "weighted_total_possible": 3.0, "weighted_graded_earned": 3.0, "weighted_graded_possible": 3.0, "first_attempted": "2023-05-12 20:55:53.096285+00:00", "subtree_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "edx.grades.problem.submitted", "visible_blocks_hash": "5RUTrtbgdsnoSKsFavs24Om8Yp0="}, "time": "2023-05-23T14:12:19.620891+00:00", "event_type": "edx.grades.subsection.grade_calculated", "event_source": "server", "page": null} +2023-05-23 14:12:19,621 INFO 42 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.subsection.grade_calculated2", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "block_id": "block-v1:edX+DemoX+Demo_Course+type@sequential+block@19a30717eff543078a5d94ae9d6c18a5", "course_version": "6452821a39fb48e67c1a860c", "weighted_total_earned": 3.0, "weighted_total_possible": 3.0, "weighted_graded_earned": 3.0, "weighted_graded_possible": 3.0, "first_attempted": "2023-05-12 20:55:53.096285+00:00", "subtree_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "edx.grades.problem.submitted", "visible_blocks_hash": "5RUTrtbgdsnoSKsFavs24Om8Yp0="}, "timestamp": "2023-05-23T14:12:19.620891+00:00", "event_type": "edx.grades.subsection.grade_calculated", "event_source": "server", "page": null} +2023-05-23 14:12:19,631 INFO 42 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.course.grade_calculated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "course_version": "6452821a39fb48e67c1a860c", "percent_grade": 0.03, "letter_grade": "", "course_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "eoh no this line is broken +2023-05-23 14:12:19,641 INFO 42 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.course.grade.now_failed", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "edx.grades.problem.submitted"}, "time": "2023-05-23T14:12:19.635349+00:00", "event_type": "edx.course.grade.now_failed", "event_source": "server", "page": null} diff --git a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py new file mode 100644 index 00000000..10c8791b --- /dev/null +++ b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py @@ -0,0 +1,396 @@ +""" +Tests for the transform_tracking_logs management command. +""" +import json +import os +from unittest.mock import MagicMock, patch + +import pytest +from django.core.management import call_command +from libcloud.storage.types import ContainerDoesNotExistError + +import event_routing_backends.management.commands.transform_tracking_logs as transform_tracking_logs +from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender +from event_routing_backends.management.commands.transform_tracking_logs import ( + get_dest_config_from_options, + get_libcloud_drivers, + get_source_config_from_options, + validate_source_and_files, +) + +LOCAL_CONFIG = json.dumps({"key": "/openedx/", "container": "data", "prefix": ""}) +REMOTE_CONFIG = json.dumps({ + "key": "api key", + "secret": "api secret key", + "prefix": "/xapi_statements/", + "container": "test_bucket", + "secure": False, + "host": "127.0.0.1", + "port": 9191 +}) + + +@pytest.fixture +def mock_common_calls(): + """ + Mock out calls that we test elsewhere and aren't relevant to the command tests. + """ + command_path = "event_routing_backends.management.commands.transform_tracking_logs" + helper_path = "event_routing_backends.management.commands.helpers" + with patch(command_path+".Provider") as mock_libcloud_provider: + with patch(command_path+".get_driver") as mock_libcloud_get_driver: + with patch(helper_path + ".queued_sender.EventsRouter") as mock_eventsrouter: + yield mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter + + +def command_options(): + """ + A fixture of different command options and their expected outputs. + """ + options = [ + # Local file to LRS, small batch size to test batching + { + "transformer_type": "xapi", + "source_provider": "LOCAL", + "source_config": LOCAL_CONFIG, + "batch_size": 1, + "sleep_between_batches_secs": 0, + "chunk_size": 1024, # We use this to override the default size of bytes to download + "expected_results": { + "expected_batches_sent": 2, + "log_lines": [ + "Looking for log files in data/*", + "Max queue size of 1 reached, sending.", + "Sending 1 events to LRS...", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 3 batches.", + "Sending to LRS!" + ] + }, + }, + # Remote file to LRS dry run no batch size + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "sleep_between_batches_secs": 0, + "dry_run": True, + "expected_results": { + # Dry run, nothing should be sent + "expected_batches_sent": 0, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 2 events to LRS", + "Dry run, skipping final storage.", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 0 batches.", + ] + }, + }, + # Remote file to LRS, default batch size + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "sleep_between_batches_secs": 0, + "expected_results": { + # No batch size given, default is 10k so only one batch sent + "expected_batches_sent": 1, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 2 events to LRS", + "Sending to LRS!", + "Sending 2 events to LRS...", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 1 batches.", + ] + }, + }, + # Local file to remote file + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "destination_provider": "MINIO", + "destination_config": REMOTE_CONFIG, + "batch_size": 2, + "sleep_between_batches_secs": 0, + "expected_results": { + # Remote files only get written once + "expected_batches_sent": 1, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 0 events to", + "Storing via Libcloud!", + "Max queue size of 2 reached, sending.", + "Storing 2 events to libcloud destination test_bucket/xapi_statements/", + "Storing 0 events to libcloud destination test_bucket/xapi_statements/", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 2 batches.", + ] + }, + }, + # Remote file dry run + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "destination_provider": "MINIO", + "destination_config": REMOTE_CONFIG, + "batch_size": 1, + "dry_run": True, + "sleep_between_batches_secs": 0, + "expected_results": { + # Dry run, nothing should be sent + "expected_batches_sent": 0, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 0 events to", + "Dry run, skipping, but still clearing the queue.", + "Dry run, skipping final storage.", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 0 batches.", + ] + }, + }, + ] + + for option in options: + yield option + + +def _get_tracking_log_file_path(): + TEST_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) + return '{test_dir}/fixtures/tracking.log'.format(test_dir=TEST_DIR_PATH) + + +def _get_raw_log_size(): + tracking_log_path = _get_tracking_log_file_path() + return os.path.getsize(tracking_log_path) + + +def _get_raw_log_stream(_, start_bytes, end_bytes): + """ + Return raw event json parsed from current fixtures + """ + tracking_log_path = _get_tracking_log_file_path() + with open(tracking_log_path, "rb") as current: + current.seek(start_bytes) + yield current.read(end_bytes - start_bytes) + + +@pytest.mark.parametrize("command_opts", command_options()) +def test_transform_command(command_opts, mock_common_calls, caplog, capsys): + """ + Test the command and QueuedSender with a variety of options. + """ + mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + + expected_results = command_opts.pop("expected_results") + transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024*1024*2) + + mm = MagicMock() + + mock_log_object = MagicMock() + mock_log_object.__str__.return_value = "tracking.log" + mock_log_object.name = "tracking.log" + mock_log_object.size = _get_raw_log_size() + + # Fake finding one log file in each container, it will be loaded and parsed twice + mm.return_value.iterate_container_objects.return_value = [mock_log_object] + mm.return_value.download_object_range_as_stream = _get_raw_log_stream + mock_libcloud_get_driver.return_value = mm + + mm2 = MagicMock() + # Fake a router mapping so some events in the log are actually processed + mm2.registry.mapping = {"problem_check": 1} + # Fake a process response that can be serialized to json + mm2.return_value = {"foo": "bar"} + mock_eventsrouter.return_value.processors = [mm2] + + call_command( + 'transform_tracking_logs', + **command_opts + ) + + # Router should only be set up once + assert mock_eventsrouter.call_count == 1 + + captured = capsys.readouterr() + print(captured.out) + + # Log statements we always expect with this configuration + assert "Streaming file tracking.log..." in captured.out + + # There are intentionally broken log statements in the test file that cause these + # lines to be emitted. + assert "EXCEPTION!!!" in caplog.text + assert "'NoneType' object has no attribute 'group'" in caplog.text + assert "Expecting ',' delimiter: line 1 column 63 (char 62)" in caplog.text + + # Check the specific expected log lines for this set of options + for line in expected_results["log_lines"]: + assert line in caplog.text or line in captured.out + + +def test_queued_sender_store_on_lrs(mock_common_calls, capsys): + """ + Test that we don't attempt to store on an LRS backend. + """ + qs = QueuedSender("LRS", "fake_container", None, "xapi") + qs.store() + + captured = capsys.readouterr() + print(captured.out) + assert "Store is being called on an LRS destination, skipping." in captured.out + + +def test_queued_sender_broken_event(mock_common_calls, capsys): + """ + Test that we don't attempt to store on an LRS backend. + """ + qs = QueuedSender("LRS", "fake_container", None, "xapi") + assert not qs.is_known_event({"this has no name key and will fail": 1}) + + +def test_queued_sender_store_empty_queue(mock_common_calls, capsys): + """ + Test that we don't attempt to store() when there's nothing in the queue. + """ + qs = QueuedSender("NOT LRS", "fake_container", None, "xapi") + qs.finalize() + + captured = capsys.readouterr() + print(captured.out) + assert "Nothing in the queue to store!" in captured.out + + +def test_queued_sender_send_on_libcloud(mock_common_calls, capsys): + """ + Test that we don't attempt to send() when using a libcloud backend. + """ + qs = QueuedSender("NOT LRS", "fake_container", None, "caliper") + qs.send() + + captured = capsys.readouterr() + print(captured.out) + assert "Skipping send, we're storing with libcloud instead of an LRS." in captured.out + + +def test_queued_sender_container_does_not_exist(mock_common_calls, caplog): + """ + Test that we raise an exception if a container doesn't exist. + """ + mock_destination = MagicMock() + mock_destination.get_container.side_effect = ContainerDoesNotExistError( + "Container 'fake_container' doesn't exist.", None, "fake") + with pytest.raises(ContainerDoesNotExistError): + qs = QueuedSender(mock_destination, "fake_container", "fake_prefix", "xapi") + qs.queued_lines = ["fake"] + qs.store() + + +def test_invalid_libcloud_source_driver(capsys, mock_common_calls): + """ + Check error cases when non-existent libcloud drivers are passed in. + """ + mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + + mock_libcloud_get_driver.side_effect = [AttributeError(), MagicMock()] + + with pytest.raises(AttributeError): + get_libcloud_drivers("I should fail", {}, "I should never get called", {}) + + captured = capsys.readouterr() + print(captured.out) + assert "is not a valid source Libcloud provider." in captured.out + + +def test_invalid_libcloud_dest_driver(capsys, mock_common_calls): + mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + + mock_libcloud_get_driver.side_effect = [MagicMock(), AttributeError()] + with pytest.raises(AttributeError): + get_libcloud_drivers("I should succeed", {}, "I should fail", {}) + + captured = capsys.readouterr() + print(captured.out) + assert "is not a valid destination Libcloud provider." in captured.out + + +def test_no_files_in_source_dir(caplog): + """ + Check error case when there are no source files found in the libcloud source. + """ + fake_driver = MagicMock() + fake_driver.iterate_container_objects.return_value = [] + with pytest.raises(FileNotFoundError): + validate_source_and_files(fake_driver, "container name", "prefix") + + +def test_required_source_libcloud_keys(capsys): + """ + Check that we raise an error if the container and prefix aren't given. + """ + with pytest.raises(KeyError): + get_source_config_from_options("{}") + + captured = capsys.readouterr() + print(captured.out) + assert "The following keys must be defined in source_config: 'prefix', 'container'" in captured.out + + +def test_required_dest_libcloud_keys(capsys): + """ + Check that we raise an error if the container and prefix aren't given in a non-LRS destination. + """ + with pytest.raises(KeyError): + get_dest_config_from_options(None, "{}") + + captured = capsys.readouterr() + print(captured.out) + assert "If not using the 'LRS' destination, the following keys must be defined in destination_config: " \ + "'prefix', 'container'" in captured.out + + +def test_get_source_config(): + """ + Check that our special keys are popped off the options when retrieving the source config. + """ + options = { + "key": "fake test key", + "container": "fake container", + "prefix": "fake prefix" + } + + config, container, prefix = get_source_config_from_options(json.dumps(options)) + + assert len(config) == 1 + assert config["key"] == options["key"] + assert container == "fake container" + assert prefix == "fake prefix" + + +def test_get_dest_config(): + """ + Check that our special keys are popped off the options when retrieving the non-LRS destination config. + """ + options = { + "key": "fake test key", + "container": "fake container", + "prefix": "fake prefix" + } + + config, container, prefix = get_dest_config_from_options("fake provider", json.dumps(options)) + + assert len(config) == 1 + assert config["key"] == options["key"] + assert container == "fake container" + assert prefix == "fake prefix" + + +def test_get_dest_config_lrs(): + """ + Check that an LRS destination config returns appropriate values. + """ + options = {} + + config, container, prefix = get_dest_config_from_options("LRS", options) + assert config is None + assert container is None + assert prefix is None diff --git a/event_routing_backends/management/commands/transform_tracking_logs.py b/event_routing_backends/management/commands/transform_tracking_logs.py new file mode 100644 index 00000000..c35a0849 --- /dev/null +++ b/event_routing_backends/management/commands/transform_tracking_logs.py @@ -0,0 +1,272 @@ +""" +Management command for transforming tracking log files. +""" +import json +import os +from io import BytesIO +from textwrap import dedent + +from django.core.management.base import BaseCommand +from libcloud.storage.providers import get_driver +from libcloud.storage.types import Provider + +from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender + +# Number of bytes to download at a time, this is 2 MB +CHUNK_SIZE = 1024 * 1024 * 2 + + +def transform_tracking_logs( + source, + source_container, + source_prefix, + sender +): + """ + Transform one or more tracking log files from the given source to the given destination. + """ + # Containers are effectively directories, this recursively tries to find files + # matching the given prefix in the given source. + container = source.get_container(container_name=source_container) + + display_path = os.path.join(source_container, source_prefix.lstrip("/")) + print(f"Looking for log files in {display_path}*") + + for file in source.iterate_container_objects(container, source_prefix): + # Download the file as a stream of characters to save on memory + print(f"Streaming file {file}...") + + last_successful_byte = 0 + line = "" + + while last_successful_byte < int(file.size): + end_byte = last_successful_byte + CHUNK_SIZE + + if end_byte > file.size: + end_byte = file.size + + chunks = source.download_object_range_as_stream( + file, + start_bytes=last_successful_byte, + end_bytes=end_byte + ) + for chunk in chunks: + chunk = chunk.decode('utf-8') + + # Loop through this chunk, if we find a newline it's time to process + # otherwise just keep appending. + for char in chunk: + if char == "\n" and line: + sender.transform_and_queue(line) + line = "" + else: + line += char + + last_successful_byte = end_byte + # Sometimes the file doesn't end with a newline, we try to use + # any remaining bytes as a final line. + if line: + sender.transform_and_queue(line) # pragma: no cover + + # Give the queue a chance to send any remaining events left in the queue + sender.finalize() + + +def get_source_config_from_options(source_config_options): + """ + Prepare our source configuration from the configuration JSON. + """ + source_config = json.loads(source_config_options) + try: + source_prefix = source_config.pop("prefix") + source_container = source_config.pop("container") + return source_config, source_container, source_prefix + except KeyError as e: + print("The following keys must be defined in source_config: 'prefix', 'container'") + raise e + + +def get_dest_config_from_options(destination_provider, dest_config_options): + """ + Prepare our destination configuration. + + All None's if these are being sent to an LRS, or use values from the destination_configuration JSON option. + """ + if destination_provider != "LRS": + dest_config = json.loads(dest_config_options) + try: + dest_container = dest_config.pop("container") + dest_prefix = dest_config.pop("prefix") + except KeyError as e: + print("If not using the 'LRS' destination, the following keys must be defined in " + "destination_config: 'prefix', 'container'") + raise e + else: + dest_config = dest_container = dest_prefix = None + + return dest_config, dest_container, dest_prefix + + +def validate_source_and_files(driver, container_name, prefix): + """ + Validate that the given libcloud source exists and has files in it to read. + """ + container = driver.get_container(container_name) + objects = list(driver.iterate_container_objects(container, prefix)) + if not objects: + raise FileNotFoundError(f"No files found in {container_name}/{prefix}*") + print(f"Found {len(objects)} files in {container_name}/{prefix}*") + return [f"{obj.name} - {obj.size} bytes" for obj in objects] + + +def validate_destination(driver, container_name, prefix, source_objects): + """ + Validate that the given libcloud destination exists and can be written to. + """ + container = driver.get_container(container_name) + full_path = f"{prefix}/manifest.log" + file_list = "\n".join(source_objects) + driver.upload_object_via_stream( + iterator=BytesIO(file_list.encode()), + container=container, + object_name=full_path + ) + print(f"Wrote source file list to '{container_name}/{full_path}'") + + +def get_libcloud_drivers(source_provider, source_config, destination_provider, destination_config): + """ + Attempt to configure the libcloud drivers for source and destination. + """ + try: + source_provider = getattr(Provider, source_provider) + source_cls = get_driver(source_provider) + source_driver = source_cls(**source_config) + except AttributeError: + print(f"{source_provider} is not a valid source Libcloud provider.") + raise + + # There is no driver for LRS + destination_driver = "LRS" + if destination_provider != "LRS": + try: + destination_provider = getattr(Provider, destination_provider) + destination_cls = get_driver(destination_provider) + destination_driver = destination_cls(**destination_config) + except AttributeError: + print(f"{destination_provider} is not a valid destination Libcloud provider.") + raise + + return source_driver, destination_driver + + +class Command(BaseCommand): + """ + Transform tracking logs to an LRS or other output destination. + """ + help = dedent(__doc__).strip() + + def add_arguments(self, parser): + parser.add_argument( + '--source_provider', + type=str, + help="An Apache Libcloud 'provider constant' from: " + "https://libcloud.readthedocs.io/en/stable/storage/supported_providers.html . " + "Ex: LOCAL for local storage or S3 for AWS S3.", + required=True, + ) + parser.add_argument( + '--source_config', + type=str, + help="A JSON dictionary of configuration for the source provider. Leave" + "blank the destination_provider is 'LRS'. See the Libcloud docs for the necessary options" + "for your destination. If your destination (S3, MinIO, etc) needs a 'bucket' or 'container' add them " + "to the config here under the key 'container'. If your source needs a prefix (ex: directory path, " + "or wildcard beginning of a filename), add it here under the key 'prefix'. If no prefix is given, " + "all files in the given location will be attempted!", + required=True, + ) + parser.add_argument( + '--destination_provider', + type=str, + default="LRS", + help="Either 'LRS' to use the default configured xAPI and/or Caliper servers" + "or an Apache Libcloud 'provider constant' from this list: " + "https://libcloud.readthedocs.io/en/stable/storage/supported_providers.html . " + "Ex: LOCAL for local storage or S3 for AWS S3.", + ) + parser.add_argument( + '--destination_config', + type=str, + help="A JSON dictionary of configuration for the destination provider. Not needed for the 'LRS' " + "destination_provider. See the Libcloud docs for the necessary options for your destination. If your " + "destination (S3, MinIO, etc) needs a 'bucket' or 'container' add them to the config here under the " + "key 'container'. If your destination needs a prefix (ex: directory path), add it here under the key " + "'prefix'. If no prefix is given, the output file(s) will be written to the base path.", + ) + parser.add_argument( + '--transformer_type', + choices=["xapi", "caliper"], + required=True, + help="The type of transformation to do, only one can be done at a time.", + ) + parser.add_argument( + '--batch_size', + type=int, + default=10000, + help="How many events to send at a time. For the LRS destination this will be one POST per this many " + "events, for all other destinations a new file will be created containing up to this many events. " + "This helps reduce memory usage in the script and increases helps with LRS performance.", + ) + parser.add_argument( + '--sleep_between_batches_secs', + type=float, + default=10.0, + help="Fractional seconds to sleep between sending batches to a destination, used to reduce load on the LMS " + "and LRSs when performing large operations.", + ) + parser.add_argument( + '--dry_run', + action="store_true", + help="Attempt to transform all lines from all files, but do not send to the destination.", + ) + + def handle(self, *args, **options): + """ + Configure the command and start the transform process. + """ + source_config, source_container, source_prefix = get_source_config_from_options(options["source_config"]) + dest_config, dest_container, dest_prefix = get_dest_config_from_options( + options["destination_provider"], + options["destination_config"] + ) + + source_driver, dest_driver = get_libcloud_drivers( + options["source_provider"], + source_config, + options["destination_provider"], + dest_config + ) + + source_file_list = validate_source_and_files(source_driver, source_container, source_prefix) + if dest_driver != "LRS": + validate_destination(dest_driver, dest_container, dest_prefix, source_file_list) + else: + print(f"Found {len(source_file_list)} source files: ", *source_file_list, sep="\n") + + sender = QueuedSender( + dest_driver, + dest_container, + dest_prefix, + options["transformer_type"], + max_queue_size=options["batch_size"], + sleep_between_batches_secs=options["sleep_between_batches_secs"], + dry_run=options["dry_run"] + ) + + transform_tracking_logs( + source_driver, + source_container, + source_prefix, + sender + ) diff --git a/event_routing_backends/models.py b/event_routing_backends/models.py index e56bbb64..cc86c1d0 100644 --- a/event_routing_backends/models.py +++ b/event_routing_backends/models.py @@ -44,7 +44,7 @@ def get_value_from_dotted_path(dict_obj, dotted_key): try: for key in nested_keys: result = result[key] - except KeyError: + except (KeyError, TypeError): return None return result diff --git a/event_routing_backends/processors/caliper/__init__.py b/event_routing_backends/processors/caliper/__init__.py index 71b10334..4f34d7ba 100644 --- a/event_routing_backends/processors/caliper/__init__.py +++ b/event_routing_backends/processors/caliper/__init__.py @@ -19,3 +19,17 @@ # .. toggle_creation_date: 2021-01-01 # .. toggle_tickets: https://openedx.atlassian.net/browse/ARCHBOM-1658 CALIPER_EVENTS_ENABLED = SettingToggle("CALIPER_EVENTS_ENABLED", default=False) + + +# .. toggle_name: CALIPER_EVENT_LOGGING_ENABLED +# .. toggle_implementation: SettingToggle +# .. toggle_default: True +# .. toggle_description: Determines whether every generated xAPI statement +# gets logged to the "caliper_tracking" logger. +# .. toggle_warning: There is a performance cost to this flag related to +# how many events the system is processing, and should generally not be +# turned on unless it is being used to push events to other systems or +# for debugging. +# .. toggle_use_cases: circuit_breaker +# .. toggle_creation_date: 2023-06-13 +CALIPER_EVENT_LOGGING_ENABLED = SettingToggle("CALIPER_EVENT_LOGGING_ENABLED", default=True) diff --git a/event_routing_backends/processors/caliper/tests/test_caliper.py b/event_routing_backends/processors/caliper/tests/test_caliper.py index 17fcacf8..3857da23 100644 --- a/event_routing_backends/processors/caliper/tests/test_caliper.py +++ b/event_routing_backends/processors/caliper/tests/test_caliper.py @@ -58,8 +58,14 @@ def test_send_method_with_unknown_exception(self, mocked_logger, _): @patch( 'event_routing_backends.processors.caliper.transformer_processor.CaliperTransformersRegistry.get_transformer' ) + @patch('event_routing_backends.processors.caliper.transformer_processor.logger') @patch('event_routing_backends.processors.caliper.transformer_processor.caliper_logger') - def test_send_method_with_successfull_flow(self, mocked_logger, mocked_get_transformer): + def test_send_method_with_successfull_flow( + self, + mocked_caliper_logger, + mocked_logger, + mocked_get_transformer + ): transformed_event = { 'transformed_key': 'transformed_value' } @@ -76,7 +82,48 @@ def test_send_method_with_successfull_flow(self, mocked_logger, mocked_get_trans json.dumps(transformed_event) ) ), - mocked_logger.info.mock_calls + mocked_logger.debug.mock_calls + ) + + self.assertIn( + call(json.dumps(transformed_event)), + mocked_caliper_logger.info.mock_calls + ) + + @override_settings(CALIPER_EVENT_LOGGING_ENABLED=False) + @patch( + 'event_routing_backends.processors.caliper.transformer_processor.CaliperTransformersRegistry.get_transformer' + ) + @patch('event_routing_backends.processors.caliper.transformer_processor.logger') + @patch('event_routing_backends.processors.caliper.transformer_processor.caliper_logger') + def test_send_method_with_successfull_flow_logging_disabled( + self, + mocked_caliper_logger, + mocked_logger, + mocked_get_transformer + ): + transformed_event = { + 'transformed_key': 'transformed_value' + } + mocked_transformer = MagicMock() + mocked_transformer.transform.return_value = transformed_event + mocked_get_transformer.return_value = mocked_transformer + + self.processor(self.sample_event) + + self.assertIn( + call( + 'Caliper version of edx event "{}" is: {}'.format( + self.sample_event.get('name'), + json.dumps(transformed_event) + ) + ), + mocked_logger.debug.mock_calls + ) + + self.assertNotIn( + call(json.dumps(transformed_event)), + mocked_caliper_logger.info.mock_calls ) @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') diff --git a/event_routing_backends/processors/caliper/transformer_processor.py b/event_routing_backends/processors/caliper/transformer_processor.py index 3bdc071a..45b8b7ec 100644 --- a/event_routing_backends/processors/caliper/transformer_processor.py +++ b/event_routing_backends/processors/caliper/transformer_processor.py @@ -6,10 +6,11 @@ from eventtracking.processors.exceptions import NoBackendEnabled -from event_routing_backends.processors.caliper import CALIPER_EVENTS_ENABLED +from event_routing_backends.processors.caliper import CALIPER_EVENT_LOGGING_ENABLED, CALIPER_EVENTS_ENABLED from event_routing_backends.processors.caliper.registry import CaliperTransformersRegistry from event_routing_backends.processors.mixins.base_transformer_processor import BaseTransformerProcessorMixin +logger = getLogger(__name__) caliper_logger = getLogger('caliper_tracking') @@ -44,7 +45,14 @@ def transform_event(self, event): transformed_event = super().transform_event(event) if transformed_event: - caliper_logger.info('Caliper version of edx event "{}" is: {}'.format(event["name"], - json.dumps(transformed_event))) + json_event = json.dumps(transformed_event) + + if CALIPER_EVENT_LOGGING_ENABLED.is_enabled(): + caliper_logger.info(json_event) + + logger.debug('Caliper version of edx event "{}" is: {}'.format( + event["name"], + json_event + )) return transformed_event diff --git a/event_routing_backends/processors/mixins/base_transformer.py b/event_routing_backends/processors/mixins/base_transformer.py index b72e005e..78cf8398 100644 --- a/event_routing_backends/processors/mixins/base_transformer.py +++ b/event_routing_backends/processors/mixins/base_transformer.py @@ -126,9 +126,11 @@ def extract_username_or_userid(self): Returns: str """ - username_or_id = self.get_data('context.username') or self.get_data('context.user_id') + username_or_id = self.get_data('username') or self.get_data('user_id') if not username_or_id: username_or_id = self.get_data('data.username') or self.get_data('data.user_id') + if not username_or_id: + username_or_id = self.get_data('context.username') or self.get_data('context.user_id') return username_or_id def extract_sessionid(self): @@ -138,7 +140,7 @@ def extract_sessionid(self): Returns: str """ - return self.get_data('context.session') or self.get_data('data.session') + return self.get_data('session') or self.get_data('context.session') or self.get_data('data.session') def get_data(self, key, required=False): """ @@ -210,6 +212,7 @@ def get_object_iri(self, object_type, object_id): """ if object_id is None or object_type is None: return None + return '{root_url}/{object_type}/{object_id}'.format( root_url=settings.LMS_ROOT_URL, object_type=object_type, diff --git a/event_routing_backends/processors/xapi/__init__.py b/event_routing_backends/processors/xapi/__init__.py index 479b6ce9..c912628f 100644 --- a/event_routing_backends/processors/xapi/__init__.py +++ b/event_routing_backends/processors/xapi/__init__.py @@ -19,3 +19,16 @@ # .. toggle_creation_date: 2021-01-01 # .. toggle_tickets: https://openedx.atlassian.net/browse/ARCHBOM-1658 XAPI_EVENTS_ENABLED = SettingToggle("XAPI_EVENTS_ENABLED", default=False) + +# .. toggle_name: XAPI_EVENT_LOGGING_ENABLED +# .. toggle_implementation: SettingToggle +# .. toggle_default: True +# .. toggle_description: Determines whether every generated xAPI statement +# gets logged to the "xapi_tracking" logger. +# .. toggle_warning: There is a performance cost to this flag related to +# how many events the system is processing, and should generally not be +# turned on unless it is being used to push events to other systems or +# for debugging. +# .. toggle_use_cases: circuit_breaker +# .. toggle_creation_date: 2023-06-13 +XAPI_EVENT_LOGGING_ENABLED = SettingToggle("XAPI_EVENT_LOGGING_ENABLED", default=True) diff --git a/event_routing_backends/processors/xapi/event_transformers/navigation_events.py b/event_routing_backends/processors/xapi/event_transformers/navigation_events.py index 6c1ad68f..8402f1dc 100644 --- a/event_routing_backends/processors/xapi/event_transformers/navigation_events.py +++ b/event_routing_backends/processors/xapi/event_transformers/navigation_events.py @@ -58,7 +58,7 @@ def get_object(self): `Activity` """ return Activity( - id=self.get_data('data.target_url'), + id=self.get_data('data.target_url', True), definition=ActivityDefinition( type=constants.XAPI_ACTIVITY_LINK ), diff --git a/event_routing_backends/processors/xapi/event_transformers/problem_interaction_events.py b/event_routing_backends/processors/xapi/event_transformers/problem_interaction_events.py index e390d6e7..6fa7cd88 100644 --- a/event_routing_backends/processors/xapi/event_transformers/problem_interaction_events.py +++ b/event_routing_backends/processors/xapi/event_transformers/problem_interaction_events.py @@ -73,17 +73,16 @@ class BaseProblemsTransformer(XApiTransformer, XApiVerbTransformerMixin): def get_object(self): """ Get object for xAPI transformed event. - Returns: `Activity` """ object_id = None data = self.get_data('data') if data and isinstance(data, dict): - object_id = data.get('problem_id', data.get('module_id', None)) + object_id = self.get_data('data.problem_id') or self.get_data('data.module_id', True) event_name = self.get_data('name', True) - # TODO: Add definition[name] of problem once it is added in the event. + return Activity( id=object_id, definition=ActivityDefinition( @@ -133,6 +132,17 @@ class ProblemSubmittedTransformer(BaseProblemsTransformer): """ additional_fields = ('result', ) + def get_object(self): + """ + Get object for xAPI transformed event. + + Returns: + `Activity` + """ + xapi_object = super().get_object() + xapi_object.id = self.get_object_iri('xblock', xapi_object.id) + return xapi_object + def get_result(self): """ Get result for xAPI transformed event. @@ -170,9 +180,11 @@ def get_object(self): # If the event was generated from browser, there is no `problem_id` # or `module_id` field. Therefore we get block id from the referrer. - if self.get_data('context.event_source') == 'browser': + event_source = self.get_data('context.event_source') or self.get_data('event_source') + referer = self.get_data('referer') or self.get_data('context.referer', True) + if event_source == 'browser': block_id = get_problem_block_id( - self.get_data('context.referer', True), + referer, self.get_data('data'), self.get_data('context.course_id') ) @@ -235,7 +247,8 @@ def get_result(self): Result """ # Do not transform result if the event is generated from browser - if self.get_data('context.event_source') == 'browser': + source = self.get_data('event_source') or self.get_data('context.event_source') + if source == 'browser': return None event_data = self.get_data('data') diff --git a/event_routing_backends/processors/xapi/event_transformers/video_events.py b/event_routing_backends/processors/xapi/event_transformers/video_events.py index 2fbd77c7..412b3310 100644 --- a/event_routing_backends/processors/xapi/event_transformers/video_events.py +++ b/event_routing_backends/processors/xapi/event_transformers/video_events.py @@ -141,9 +141,7 @@ def get_object(self): `Activity` """ course_id = self.get_data('context.course_id', True) - video_id = self.get_data('data.id', True) - object_id = make_video_block_id(course_id=course_id, video_id=video_id) return Activity( @@ -200,9 +198,10 @@ def get_result(self): Returns: `Result` """ + current_time = self.get_data('data.current_time') or self.get_data('data.currentTime') return Result( extensions=Extensions({ - constants.XAPI_RESULT_VIDEO_TIME: convert_seconds_to_float(self.get_data('data.currentTime')) + constants.XAPI_RESULT_VIDEO_TIME: convert_seconds_to_float(current_time) }) ) @@ -237,11 +236,11 @@ def get_result(self): 'show_transcript', ] ) + current_time = self.get_data('data.current_time') or self.get_data('data.currentTime') + return Result( extensions=Extensions({ - constants.XAPI_RESULT_VIDEO_TIME: convert_seconds_to_float( - self.get_data('data.currentTime') or self.get_data('data.current_time') - ), + constants.XAPI_RESULT_VIDEO_TIME: convert_seconds_to_float(current_time), constants.XAPI_RESULT_VIDEO_CC_ENABLED: cc_enabled }) ) diff --git a/event_routing_backends/processors/xapi/tests/fixtures/expected/edx.grades.problem.submitted.json b/event_routing_backends/processors/xapi/tests/fixtures/expected/edx.grades.problem.submitted.json index 9a154e1b..32f81365 100644 --- a/event_routing_backends/processors/xapi/tests/fixtures/expected/edx.grades.problem.submitted.json +++ b/event_routing_backends/processors/xapi/tests/fixtures/expected/edx.grades.problem.submitted.json @@ -28,7 +28,7 @@ "definition": { "type": "http://adlnet.gov/expapi/activities/question" }, - "id": "block-v1:edX+DemoX+Demo_Course+type@problem+block@3fc5461f86764ad7bdbdf6cbdde61e66", + "id": "http://localhost:18000/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@3fc5461f86764ad7bdbdf6cbdde61e66", "objectType": "Activity" }, "result": { diff --git a/event_routing_backends/processors/xapi/tests/test_xapi.py b/event_routing_backends/processors/xapi/tests/test_xapi.py index c898c475..9bb044ba 100644 --- a/event_routing_backends/processors/xapi/tests/test_xapi.py +++ b/event_routing_backends/processors/xapi/tests/test_xapi.py @@ -1,12 +1,13 @@ """ Test the xAPI processor. """ +import uuid from django.test import SimpleTestCase from django.test.utils import override_settings from eventtracking.processors.exceptions import EventEmissionExit, NoBackendEnabled from mock import MagicMock, call, patch, sentinel -from tincan import Statement +from tincan import Activity, Statement from event_routing_backends.processors.xapi.transformer_processor import XApiProcessor @@ -56,6 +57,7 @@ def test_send_method_with_unknown_exception(self, mocked_logger, _): @patch('event_routing_backends.processors.xapi.transformer_processor.xapi_logger') def test_send_method_with_successfull_flow(self, mocked_logger, mocked_get_transformer): transformed_event = Statement() + transformed_event.object = Activity(id=str(uuid.uuid4())) mocked_transformer = MagicMock() mocked_transformer.transform.return_value = transformed_event mocked_get_transformer.return_value = mocked_transformer @@ -64,6 +66,37 @@ def test_send_method_with_successfull_flow(self, mocked_logger, mocked_get_trans self.assertIn(call(transformed_event.to_json()), mocked_logger.mock_calls) + @patch( + 'event_routing_backends.processors.xapi.transformer_processor.XApiTransformersRegistry.get_transformer' + ) + @patch('event_routing_backends.processors.xapi.transformer_processor.xapi_logger') + def test_send_method_with_invalid_object(self, mocked_logger, mocked_get_transformer): + transformed_event = Statement() + mocked_transformer = MagicMock() + mocked_transformer.transform.return_value = transformed_event + mocked_get_transformer.return_value = mocked_transformer + + with self.assertRaises(EventEmissionExit): + self.processor(self.sample_event) + + self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls) + + @override_settings(XAPI_EVENT_LOGGING_ENABLED=False) + @patch( + 'event_routing_backends.processors.xapi.transformer_processor.XApiTransformersRegistry.get_transformer' + ) + @patch('event_routing_backends.processors.xapi.transformer_processor.xapi_logger') + def test_send_method_with_successfull_flow_no_logger(self, mocked_logger, mocked_get_transformer): + transformed_event = Statement() + transformed_event.object = Activity(id=str(uuid.uuid4())) + mocked_transformer = MagicMock() + mocked_transformer.transform.return_value = transformed_event + mocked_get_transformer.return_value = mocked_transformer + + self.processor(self.sample_event) + + self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls) + @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') def test_with_no_registry(self, mocked_logger): backend = XApiProcessor() diff --git a/event_routing_backends/processors/xapi/transformer.py b/event_routing_backends/processors/xapi/transformer.py index 482fab15..74f2ff0e 100644 --- a/event_routing_backends/processors/xapi/transformer.py +++ b/event_routing_backends/processors/xapi/transformer.py @@ -162,7 +162,8 @@ def verb(self): """ event_name = self.get_data('name', True) - if self.get_data('context.event_source') == 'browser' and event_name == 'problem_check': + event_source = self.get_data('event_source') or self.get_data('context.event_source') + if event_source == 'browser' and event_name == 'problem_check': verb = self.verb_map['problem_check_browser'] else: verb = self.verb_map[event_name] diff --git a/event_routing_backends/processors/xapi/transformer_processor.py b/event_routing_backends/processors/xapi/transformer_processor.py index 7aab55ef..71ac712d 100644 --- a/event_routing_backends/processors/xapi/transformer_processor.py +++ b/event_routing_backends/processors/xapi/transformer_processor.py @@ -7,9 +7,10 @@ from eventtracking.processors.exceptions import NoBackendEnabled from event_routing_backends.processors.mixins.base_transformer_processor import BaseTransformerProcessorMixin -from event_routing_backends.processors.xapi import XAPI_EVENTS_ENABLED +from event_routing_backends.processors.xapi import XAPI_EVENT_LOGGING_ENABLED, XAPI_EVENTS_ENABLED from event_routing_backends.processors.xapi.registry import XApiTransformersRegistry +logger = getLogger(__name__) xapi_logger = getLogger('xapi_tracking') @@ -45,8 +46,15 @@ def transform_event(self, event): if transformed_event: event_json = transformed_event.to_json() - xapi_logger.info(event_json) - xapi_logger.info('xAPI statement of edx event "{}" is: {}'.format(event["name"], event_json)) + + if not transformed_event.object or not transformed_event.object.id: + logger.debug('xAPI statement of edx event "{}" has no object id: {}'.format(event["name"], event_json)) + return None + + if XAPI_EVENT_LOGGING_ENABLED.is_enabled(): + xapi_logger.info(event_json) + + logger.debug('xAPI statement of edx event "{}" is: {}'.format(event["name"], event_json)) return json.loads(event_json) return transformed_event diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index 626df276..ef3d792b 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -8,7 +8,9 @@ def plugin_settings(settings): Adds default settings for the event_routing_backends app. """ settings.CALIPER_EVENTS_ENABLED = False + settings.CALIPER_EVENT_LOGGING_ENABLED = False settings.XAPI_EVENTS_ENABLED = True + settings.XAPI_EVENT_LOGGING_ENABLED = True settings.EVENT_ROUTING_BACKEND_MAX_RETRIES = 3 settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30 diff --git a/event_routing_backends/settings/production.py b/event_routing_backends/settings/production.py index 0b8aefd2..2cd9e926 100644 --- a/event_routing_backends/settings/production.py +++ b/event_routing_backends/settings/production.py @@ -19,10 +19,18 @@ def plugin_settings(settings): 'CALIPER_EVENTS_ENABLED', settings.CALIPER_EVENTS_ENABLED ) + settings.CALIPER_EVENT_LOGGING_ENABLED = settings.ENV_TOKENS.get( + 'CALIPER_EVENT_LOGGING_ENABLED', + settings.CALIPER_EVENT_LOGGING_ENABLED + ) settings.XAPI_EVENTS_ENABLED = settings.ENV_TOKENS.get( 'XAPI_EVENTS_ENABLED', settings.XAPI_EVENTS_ENABLED ) + settings.XAPI_EVENT_LOGGING_ENABLED = settings.ENV_TOKENS.get( + 'XAPI_EVENT_LOGGING_ENABLED', + settings.XAPI_EVENT_LOGGING_ENABLED + ) settings.EVENT_TRACKING_BACKENDS = settings.ENV_TOKENS.get( 'EVENT_TRACKING_BACKENDS', settings.EVENT_TRACKING_BACKENDS diff --git a/event_routing_backends/tasks.py b/event_routing_backends/tasks.py index 6f5e4f00..19e17e3c 100644 --- a/event_routing_backends/tasks.py +++ b/event_routing_backends/tasks.py @@ -82,5 +82,57 @@ def send_event(task, event_name, event, router_type, host_config): ), exc_info=True ) + raise task.retry(exc=exc, countdown=getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 30), + max_retries=getattr(settings, '' + 'EVENT_ROUTING_BACKEND_MAX_RETRIES', 3)) + + +@shared_task(bind=True) +def dispatch_bulk_events(self, events, router_type, host_config): + """ + Send a batch of events to the same configured client. + + Arguments: + self (object) : celery task object to perform celery actions + events (list[dict]) : list of event dictionaries to be delivered. + router_type (str) : decides the client to use for sending the event + host_config (dict) : contains configurations for the host. + """ + bulk_send_events(self, events, router_type, host_config) + + +def bulk_send_events(task, events, router_type, host_config): + """ + Send event to configured client. + + Arguments: + task (object) : celery task object to perform celery actions + events (list[dict]) : list of event dictionaries to be delivered. + router_type (str) : decides the client to use for sending the event + host_config (dict) : contains configurations for the host. + """ + try: + client_class = ROUTER_STRATEGY_MAPPING[router_type] + except KeyError: + logger.error('Unsupported routing strategy detected: {}'.format(router_type)) + return + + try: + client = client_class(**host_config) + client.bulk_send(events) + logger.debug( + 'Successfully bulk dispatched transformed versions of {} events using client: {}'.format( + len(events), + client_class + ) + ) + except EventNotDispatched as exc: + logger.exception( + 'Exception occurred while trying to bulk dispatch {} events using client: {}'.format( + len(events), + client_class + ), + exc_info=True + ) raise task.retry(exc=exc, countdown=getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 30), max_retries=getattr(settings, 'EVENT_ROUTING_BACKEND_MAX_RETRIES', 3)) diff --git a/event_routing_backends/tests/test_helpers.py b/event_routing_backends/tests/test_helpers.py index 5008c3cf..3ab51a34 100644 --- a/event_routing_backends/tests/test_helpers.py +++ b/event_routing_backends/tests/test_helpers.py @@ -8,6 +8,7 @@ from event_routing_backends.helpers import ( get_anonymous_user_id, get_block_id_from_event_referrer, + get_course_from_id, get_user_email, get_uuid5, ) @@ -73,3 +74,9 @@ def test_get_uuid5(self): name = f'{another_actor}-{timestamp}' uuid_3 = get_uuid5(verb, name) self.assertNotEqual(uuid_1, uuid_3) + + @patch('event_routing_backends.helpers.get_course_overviews') + def test_get_course_from_id(self, mock_get_course_overviews): + mock_get_course_overviews.return_value = [] + course = get_course_from_id("foo") + self.assertEqual(course["display_name"], "Unknown Course") diff --git a/event_routing_backends/tests/test_settings.py b/event_routing_backends/tests/test_settings.py index f655bdba..95f1ef13 100644 --- a/event_routing_backends/tests/test_settings.py +++ b/event_routing_backends/tests/test_settings.py @@ -24,7 +24,9 @@ def test_common_settings(self): self.assertIn('caliper', settings.EVENT_TRACKING_BACKENDS) self.assertIn('edx.course.enrollment.activated', settings.EVENT_TRACKING_BACKENDS_BUSINESS_CRITICAL_EVENTS) self.assertFalse(settings.CALIPER_EVENTS_ENABLED) + self.assertFalse(settings.CALIPER_EVENT_LOGGING_ENABLED) self.assertTrue(settings.XAPI_EVENTS_ENABLED) + self.assertTrue(settings.XAPI_EVENT_LOGGING_ENABLED) def test_devstack_settings(self): """ @@ -35,7 +37,9 @@ def test_devstack_settings(self): self.assertIn('caliper', settings.EVENT_TRACKING_BACKENDS) self.assertIn('edx.course.enrollment.deactivated', settings.EVENT_TRACKING_BACKENDS_BUSINESS_CRITICAL_EVENTS) self.assertFalse(settings.CALIPER_EVENTS_ENABLED) + self.assertFalse(settings.CALIPER_EVENT_LOGGING_ENABLED) self.assertTrue(settings.XAPI_EVENTS_ENABLED) + self.assertTrue(settings.XAPI_EVENT_LOGGING_ENABLED) def test_production_settings(self): """ @@ -44,11 +48,15 @@ def test_production_settings(self): settings.ENV_TOKENS = { 'EVENT_TRACKING_BACKENDS': None, 'CALIPER_EVENTS_ENABLED': False, + 'CALIPER_EVENT_LOGGING_ENABLED': True, 'XAPI_EVENTS_ENABLED': False, + 'XAPI_EVENT_LOGGING_ENABLED': True, 'EVENT_TRACKING_BACKENDS_BUSINESS_CRITICAL_EVENTS': [], } production_setttings.plugin_settings(settings) self.assertIsNone(settings.EVENT_TRACKING_BACKENDS) self.assertFalse(bool(settings.EVENT_TRACKING_BACKENDS_BUSINESS_CRITICAL_EVENTS)) self.assertFalse(settings.CALIPER_EVENTS_ENABLED) + self.assertTrue(settings.CALIPER_EVENT_LOGGING_ENABLED) self.assertFalse(settings.XAPI_EVENTS_ENABLED) + self.assertTrue(settings.XAPI_EVENT_LOGGING_ENABLED) diff --git a/event_routing_backends/utils/http_client.py b/event_routing_backends/utils/http_client.py index 316e84a9..eead052c 100644 --- a/event_routing_backends/utils/http_client.py +++ b/event_routing_backends/utils/http_client.py @@ -49,6 +49,42 @@ def get_auth_header(self): } return {} + def bulk_send(self, events): + """ + Send the list of events to a configured remote. + + Arguments: + events (list[dict]) : list of event payloads to send to host. + + Returns: + requests.Response object + """ + headers = self.HEADERS.copy() + headers.update(self.get_auth_header()) + + options = self.options.copy() + options.update({ + 'url': self.URL, + 'json': events, + 'headers': headers, + }) + if self.AUTH_SCHEME == RouterConfiguration.AUTH_BASIC: + options.update({'auth': (self.username, self.password)}) + logger.debug('Sending caliper version of {} edx events to {}'.format(len(events), self.URL)) + response = requests.post(**options) # pylint: disable=missing-timeout + + if not 200 <= response.status_code < 300: + logger.warning( + '{} request failed for sending Caliper version of {} edx events to {}.Response code: {}. ' + 'Response: ' + '{}'.format( + response.request.method, + len(events), self.URL, + response.status_code, + response.text + )) + raise EventNotDispatched + def send(self, event, event_name): """ Send the event to configured remote. diff --git a/event_routing_backends/utils/xapi_lrs_client.py b/event_routing_backends/utils/xapi_lrs_client.py index 6b7ff57d..4e9f1216 100644 --- a/event_routing_backends/utils/xapi_lrs_client.py +++ b/event_routing_backends/utils/xapi_lrs_client.py @@ -60,6 +60,30 @@ def get_auth_header_value(self): return None + def bulk_send(self, statement_data): + """ + Send a batch of xAPI statements to configured remote. + + Arguments: + statement_data (List[Statement]) : a list of transformed xAPI statements + + Returns: + requests.Response object + """ + logger.debug('Sending {} xAPI statements to {}'.format(len(statement_data), self.URL)) + + response = self.lrs_client.save_statements(statement_data) + + if not response.success: + if response.response.code == 409: + logger.warning(f"Duplicate event id found in: {response.request.content}") + else: + logger.warning(f"Failed request: {response.request.content}") + logger.warning('{} request failed for sending xAPI statement of edx events to {}. ' + 'Response code: {}. Response: {}'.format(response.request.method, self.URL, + response.response.code, response.data)) + raise EventNotDispatched + def send(self, statement_data, event_name): """ Send the xAPI statement to configured remote. diff --git a/requirements/base.in b/requirements/base.in index c13359c7..17a1fb7c 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -1,7 +1,7 @@ # Core requirements for using this application -c constraints.txt -Django # Web application framework +Django # Web application framework isodate pytz requests @@ -13,3 +13,5 @@ edx-toggles # For SettingToggle class tincan event-tracking edx-celeryutils +apache-libcloud # For bulk event log loading +fasteners # Locking tools, required by apache-libcloud, but somehow not installed with it diff --git a/requirements/base.txt b/requirements/base.txt index 0b1284be..65970916 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -8,6 +8,8 @@ amqp==5.1.1 # via kombu aniso8601==9.0.1 # via tincan +apache-libcloud==3.7.0 + # via -r requirements/base.in asgiref==3.7.2 # via django backports-zoneinfo[tzdata]==0.2.1 @@ -86,6 +88,8 @@ edx-toggles==5.0.0 # via -r requirements/base.in event-tracking==2.1.0 # via -r requirements/base.in +fasteners==0.18 + # via -r requirements/base.in idna==3.4 # via requests isodate==0.6.1 @@ -130,7 +134,9 @@ pytz==2023.3 pyyaml==6.0 # via code-annotations requests==2.31.0 - # via -r requirements/base.in + # via + # -r requirements/base.in + # apache-libcloud six==1.16.0 # via # click-repl @@ -155,7 +161,7 @@ tzdata==2023.3 # via # backports-zoneinfo # celery -urllib3==2.0.2 +urllib3==2.0.3 # via requests vine==5.0.0 # via diff --git a/requirements/dev.txt b/requirements/dev.txt index 71892d3b..7ede33c8 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -12,6 +12,8 @@ aniso8601==9.0.1 # via # -r requirements/quality.txt # tincan +apache-libcloud==3.7.0 + # via -r requirements/quality.txt asgiref==3.7.2 # via # -r requirements/quality.txt @@ -170,6 +172,8 @@ faker==18.10.1 # via # -r requirements/quality.txt # factory-boy +fasteners==0.18 + # via -r requirements/quality.txt filelock==3.12.0 # via # -r requirements/ci.txt @@ -274,7 +278,7 @@ pycparser==2.21 # via # -r requirements/quality.txt # cffi -pydantic==1.10.8 +pydantic==1.10.9 # via inflect pydocstyle==6.3.0 # via -r requirements/quality.txt @@ -343,7 +347,9 @@ pyyaml==6.0 # code-annotations # edx-i18n-tools requests==2.31.0 - # via -r requirements/quality.txt + # via + # -r requirements/quality.txt + # apache-libcloud six==1.16.0 # via # -r requirements/ci.txt @@ -408,7 +414,7 @@ tzdata==2023.3 # -r requirements/quality.txt # backports-zoneinfo # celery -urllib3==2.0.2 +urllib3==2.0.3 # via # -r requirements/quality.txt # requests diff --git a/requirements/doc.txt b/requirements/doc.txt index 0f9f1830..d2a6d572 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -16,6 +16,8 @@ aniso8601==9.0.1 # via # -r requirements/test.txt # tincan +apache-libcloud==3.7.0 + # via -r requirements/test.txt asgiref==3.7.2 # via # -r requirements/test.txt @@ -90,7 +92,6 @@ cryptography==41.0.1 # via # -r requirements/test.txt # djfernet - # secretstorage ddt==1.6.0 # via -r requirements/test.txt django==3.2.19 @@ -162,6 +163,8 @@ faker==18.10.1 # via # -r requirements/test.txt # factory-boy +fasteners==0.18 + # via -r requirements/test.txt idna==3.4 # via # -r requirements/test.txt @@ -182,10 +185,6 @@ isodate==0.6.1 # via -r requirements/test.txt jaraco-classes==3.2.3 # via keyring -jeepney==0.8.0 - # via - # keyring - # secretstorage jinja2==3.1.2 # via # -r requirements/test.txt @@ -301,6 +300,7 @@ readme-renderer==37.3 requests==2.31.0 # via # -r requirements/test.txt + # apache-libcloud # requests-toolbelt # sphinx # twine @@ -312,8 +312,6 @@ rfc3986==2.0.0 # via twine rich==13.4.1 # via twine -secretstorage==3.3.3 - # via keyring six==1.16.0 # via # -r requirements/test.txt @@ -384,7 +382,7 @@ tzdata==2023.3 # -r requirements/test.txt # backports-zoneinfo # celery -urllib3==2.0.2 +urllib3==2.0.3 # via # -r requirements/test.txt # requests diff --git a/requirements/quality.txt b/requirements/quality.txt index d230d496..add6a371 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -12,6 +12,8 @@ aniso8601==9.0.1 # via # -r requirements/test.txt # tincan +apache-libcloud==3.7.0 + # via -r requirements/test.txt asgiref==3.7.2 # via # -r requirements/test.txt @@ -148,6 +150,8 @@ faker==18.10.1 # via # -r requirements/test.txt # factory-boy +fasteners==0.18 + # via -r requirements/test.txt idna==3.4 # via # -r requirements/test.txt @@ -270,7 +274,9 @@ pyyaml==6.0 # -r requirements/test.txt # code-annotations requests==2.31.0 - # via -r requirements/test.txt + # via + # -r requirements/test.txt + # apache-libcloud six==1.16.0 # via # -r requirements/test.txt @@ -316,7 +322,7 @@ tzdata==2023.3 # -r requirements/test.txt # backports-zoneinfo # celery -urllib3==2.0.2 +urllib3==2.0.3 # via # -r requirements/test.txt # requests diff --git a/requirements/test.txt b/requirements/test.txt index 251e60da..96f5afc5 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -12,6 +12,8 @@ aniso8601==9.0.1 # via # -r requirements/base.txt # tincan +apache-libcloud==3.7.0 + # via -r requirements/base.txt asgiref==3.7.2 # via # -r requirements/base.txt @@ -129,6 +131,8 @@ factory-boy==3.2.1 # via -r requirements/test.in faker==18.10.1 # via factory-boy +fasteners==0.18 + # via -r requirements/base.txt idna==3.4 # via # -r requirements/base.txt @@ -216,7 +220,9 @@ pyyaml==6.0 # -r requirements/base.txt # code-annotations requests==2.31.0 - # via -r requirements/base.txt + # via + # -r requirements/base.txt + # apache-libcloud six==1.16.0 # via # -r requirements/base.txt @@ -253,7 +259,7 @@ tzdata==2023.3 # -r requirements/base.txt # backports-zoneinfo # celery -urllib3==2.0.2 +urllib3==2.0.3 # via # -r requirements/base.txt # requests diff --git a/test_settings.py b/test_settings.py index e40d9d77..23efece7 100644 --- a/test_settings.py +++ b/test_settings.py @@ -40,17 +40,12 @@ def root(*args): ] SECRET_KEY = 'insecure-secret-key' - LMS_ROOT_URL = 'http://localhost:18000' - CELERY_ALWAYS_EAGER = True - XAPI_EVENTS_ENABLED = True - +XAPI_EVENT_LOGGING_ENABLED = False RUNNING_WITH_TEST_SETTINGS = True - EVENT_TRACKING_BACKENDS = {} - XAPI_AGENT_IFI_TYPE = 'external_id' _mock_third_party_modules()