Skip to content

Commit

Permalink
feat: Management command to batch load tracking logs (#301)
Browse files Browse the repository at this point in the history
* feat: Add command to bulk transform event log files

This management command can pull from local storage or many types of cloud storage to transform tracking logs into xAPI or Caliper, and save them to local, cloud, or LRS.

* feat: Make logging of transformed statements toggleable

Added new feature toggles for logging xapi and caliper statements to help performance when they're not needed.
  • Loading branch information
bmtcril authored Jun 20, 2023
1 parent ca377d2 commit 46ba27a
Show file tree
Hide file tree
Showing 40 changed files with 1,881 additions and 117 deletions.
4 changes: 4 additions & 0 deletions docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ Two types of configuration are needed for the plugin:

By default, both ``xapi`` and ``caliper`` backends are already configured along with filters that allow all the supported events. ``caliper`` backend is disabled by default and can be enabled by setting ``CALIPER_EVENTS_ENABLED`` to ``True`` in plugin settings.

Additionally separate log streams for xAPI and Caliper are generated as events are transformed and can be configured to be saved or ignored. These can be configured as described in the `Django docs <https://docs.djangoproject.com/en/4.2/topics/logging/>`_ for the ``xapi_tracking`` and ``caliper_tracking`` loggers.



Router configuration
--------------------

Expand Down
121 changes: 121 additions & 0 deletions docs/howto/how_to_bulk_transform.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
How To Bulk Transform Tracking Logs
===================================

This is a rough guide of how to transform existing tracking log files into the formats supported by event-routing-backends using the ``transform_tracking_logs`` Django management command inside a running LMS installation. Because the transformations perform database access, looking up user, course, and block data, you will need to run this command on the same install of Open edX that created the tracking log files.

.. warning:: This also means that doing large amounts of transformations can cause performance issues on the LMS and downstream learning record stores. Make sure to use the ``--batch_size`` and ``--sleep_between_batches_secs`` options to balance system performance vs load time.

Sources and Destinations
------------------------

For most sources and destinations we use `Apache Libcloud Object storage <https://libcloud.readthedocs.io/en/stable/supported_providers.html>`__ . This should cover casees from local storage to Amazon S3, MinIO, and many other cloud storage solutions. The ``--source_provider`` and ``--destination_provider`` options are the Libcloud Provider names, in all caps (ex: ``S3``, ``LOCAL``, ``MINIO``). The ``--source_config`` and ``--destination_config`` options are JSON strings passed directly to the Libcloud constructor as keyword args.

The ``LRS`` destination provider is a special case that uses the usual event-routing-backends logic for sending events to Caliper and/or xAPI learning record stores.

For the ``LOCAL`` provider, the path to the file(s) is a concatenation of the ``key``, which is the path to a top level directory, a ``container`` which is a single subdirectory name inside the ``key`` directory, and a ``prefix`` (if provided) will be appended to the container to determine the final path.

::

# This will attempt to recursively read all files in the "/openedx/data/logs/" directory
{"key": "/openedx/data", "container": "logs"}

# This will attempt to recursively read all files in "/openedx/data/logs/tracking/" as
# well as any files in "/openedx/data/logs/" that begin with "tracking"
{"key": "/openedx/data", "container": "logs", "prefix": "tracking"}

# This will attempt to read a single file named "/openedx/data/logs/tracking.log"
{"key": "/openedx/data", "container": "logs", "prefix": "tracking.log"}


For other providers ``key`` and ``secret`` are authentication credentials and ``container`` is roughly synonymous with an S3 bucket. Configuration for each provider is different, please consult the libcloud docs for your provider to learn about other options you may need to pass in to the ``--source_config`` and ``--destination_config`` JSON structures.


Modes Of Operation
------------------

The command can work in a few distinct ways.

**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues.

**File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied.

Additionally all generated statements are written to a Python logger which can be configured to be ignored, save to a file, write standard out, or a log forwarder like `Vector <https://vector.dev/>`__ for more statement handling options. The two loggers are named ``xapi_tracking`` and ``caliper_tracking``, and are always running.

**File(s) to logger** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with loggers mentioned above, you can use Python log forwarding without the additional overhead of storing full files.

.. warning::
Events may be filtered differently in this command than in normal operation. Normally events pass through two layers of filters as described in `getting started <docs/getting_started.rst>`_.

First are the eventtracking AsyncRoutingBackend can have processor filters, which will be ignored when running this script (since these events have already passed through the eventtracking process).

Second are the router configuration filters which work on a per-LRS basis. These are respected when the destination is LRS, but ignored when writing to a file and the loggers.


Examples
--------

**Files to LRS**

::

# Transform all events in the local file /openedx/data/tracking.log to all configured LRSs
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \
--destination_provider LRS \
--transformer_type xapi

::

# Transform all events in the local file /openedx/data/tracking.log to all configured LRSs
# using a smaller batch size and long sleep for LMS performance
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \
--destination_provider LRS \
--transformer_type caliper \
--batch_size 1000 \
--sleep_between_batches_secs 2.5

::

# Recursively transform any files whose names start with "tracking" from a "logs" directory in the
# MINIO bucket "logs" to all configured LRSs
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data", "container": "logs", "prefix":"tracking"}' \
--destination_provider MINIO --destination_config '{"key": "openedx", "secret": "minio secret key", "container": "openedx", "prefix": "transformed_logs/2023-06-01/", "host": "files.local.overhang.io", "secure": false}' \
--transformer_type xapi

python manage.py lms transform_tracking_logs \
--transformer_type xapi
--source_provider S3 \
--source_config '{"key": "AWS key", "secret": "AWS secret", "container": "logs", "prefix":"tracking"}' \
--destination_provider LRS

**Files to Files**

::

# Transform the entire local file /openedx/data/tracking.log to a new file in the local directory
# /openedx/data/logs/transformed_events/ the file will be named with the current timestamp.
# Note: The "container" directory must exist!
python manage.py lms transform_tracking_logs \
--transformer_type caliper \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "container": "logs", "prefix": "tracking.log"}' \
--destination_provider LOCAL \
--destination_config '{"key": "/openedx/data/", "container": "transformed_logs", "prefix": "2023-06-01"}'

::

# Recursively transform any files whose names start with "tracking" from a "tracking_logs" directory in the
# MinIO bucket "openedx" to a single file in a MinIO storage's "transformed_logs" bucket.
# ie: http://files.local.overhang.io/openedx/tracking_logs/tracking* to http://files.local.overhang.io/openedx/transformed_logs/2023-06-02/23-06-02_20-29-20_xapi.log
# This is the configuration for a tutor local environment with MinIO enabled.
python manage.py lms transform_tracking_logs \
--source_provider MINIO \
--source_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "/tracking_logs", "host": "files.local.overhang.io", "secure": false}' \
--destination_provider MINIO \
--destination_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "transformed_logs/2023-06-02/", "host": "files.local.overhang.io", "secure": false}' --transformer_type xapi

180 changes: 118 additions & 62 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.tasks import dispatch_event, dispatch_event_persistent
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent

logger = logging.getLogger(__name__)

Expand All @@ -28,7 +28,95 @@ def __init__(self, processors=None, backend_name=None):
self.processors = processors if processors else []
self.backend_name = backend_name

def send(self, event):
def configure_host(self, host, router):
"""
Create host_configurations for the given host and router.
"""
host['host_configurations'] = {}
host['host_configurations'].update({'url': router.route_url})
host['host_configurations'].update({'auth_scheme': router.auth_scheme})

if router.auth_scheme == RouterConfiguration.AUTH_BASIC:
host['host_configurations'].update({'username': router.username})
host['host_configurations'].update({'password': router.password})
elif router.auth_scheme == RouterConfiguration.AUTH_BEARER:
host['host_configurations'].update({'auth_key': router.auth_key})

if router.backend_name == RouterConfiguration.CALIPER_BACKEND:
host.update({'router_type': 'AUTH_HEADERS'})
if 'headers' in host:
host['host_configurations'].update({'headers': host['headers']})
elif router.backend_name == RouterConfiguration.XAPI_BACKEND:
host.update({'router_type': 'XAPI_LRS'})
else:
host.update({'router_type': 'INVALID_TYPE'})

return host

def prepare_to_send(self, events):
"""
Prepare a list of events to be sent and create a processed, filtered batch for each router.
"""
routers = RouterConfiguration.get_enabled_routers(self.backend_name)
business_critical_events = get_business_critical_events()
route_events = {}

# We continue even without routers here to allow logging of statements to happen.
# If operators do not wish to log and have no enabled routers they should set XAPI_EVENTS_ENABLED
# or CALIPER_EVENTS_ENABLED to false.
if not routers:
logger.debug('Could not find any enabled router configuration for backend %s', self.backend_name)
routers = []

for event in events:
try:
event_name = event['name']
except TypeError as exc:
raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc

try:
logger.debug(
'Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name)
)

processed_event = self.process_event(event)
except (EventEmissionExit, ValueError):
logger.error(
'Could not process edx event "%s" for backend %s\'s router',
event_name,
self.backend_name,
exc_info=True
)
continue

logger.debug(
'Successfully processed edx event "%s" for router with backend %s. Processed event: %s',
event_name,
self.backend_name,
processed_event
)

for router in routers:
host = router.get_allowed_host(event)
router_pk = router.pk

if not host:
logger.info(
'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"',
event_name, router_pk, self.backend_name
)
else:
host = self.configure_host(host, router)
updated_event = self.overwrite_event_data(processed_event, host, event_name)
is_business_critical = event_name in business_critical_events
if router_pk not in route_events:
route_events[router_pk] = [(event_name, updated_event, host, is_business_critical),]
else:
route_events[router_pk].append((event_name, updated_event, host, is_business_critical))

return route_events

def bulk_send(self, events):
"""
Send the event to configured routers after processing it.
Expand All @@ -37,71 +125,39 @@ def send(self, event):
the list of hosts to which the event is required to be delivered to.
Arguments:
event (dict): original event dictionary
events (list[dict]): list of original event dictionaries
"""
routers = RouterConfiguration.get_enabled_routers(self.backend_name)

if not routers:
logger.info('Could not find any enabled router configuration for backend %s', self.backend_name)
return
event_routes = self.prepare_to_send(events)

for events_for_route in event_routes.values():
prepared_events = []
host = None
for _, updated_event, host, _ in events_for_route:
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
prepared_events,
host['router_type'],
host['host_configurations']
)

try:
event_name = event['name']
except TypeError as exc:
raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc
try:
logger.debug('Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name))
def send(self, event):
"""
Send the event to configured routers after processing it.
processed_event = self.process_event(event)
Event is processed through the configured processors. A router config
object matching the backend_name and other match params is used to get
the list of hosts to which the event is required to be delivered to.
except EventEmissionExit:
logger.error(
'Could not process edx event "%s" for backend %s\'s router',
event_name,
self.backend_name,
exc_info=True
)
return

logger.debug(
'Successfully processed edx event "%s" for router with backend %s. Processed event: %s',
event_name,
self.backend_name,
processed_event
)

for router in routers:
host = router.get_allowed_host(event)

router_url = router.route_url
if not host:
logger.info(
'Event %s is not allowed to be sent to any host for router %s with backend "%s"',
event_name, router_url, self.backend_name
)
else:
updated_event = self.overwrite_event_data(processed_event, host, event_name)
host['host_configurations'] = {}
host['host_configurations'].update({'url': router_url})
host['host_configurations'].update({'auth_scheme': router.auth_scheme})

if router.auth_scheme == RouterConfiguration.AUTH_BASIC:
host['host_configurations'].update({'username': router.username})
host['host_configurations'].update({'password': router.password})
elif router.auth_scheme == RouterConfiguration.AUTH_BEARER:
host['host_configurations'].update({'auth_key': router.auth_key})

if router.backend_name == RouterConfiguration.CALIPER_BACKEND:
host.update({'router_type': 'AUTH_HEADERS'})
if 'headers' in host:
host['host_configurations'].update({'headers': host['headers']})
elif router.backend_name == RouterConfiguration.XAPI_BACKEND:
host.update({'router_type': 'XAPI_LRS'})
else:
host.update({'router_type': 'INVALID_TYPE'})
Arguments:
event (dict): the original event dictionary
"""
event_routes = self.prepare_to_send([event])

business_critical_events = get_business_critical_events()
if event_name in business_critical_events:
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
dispatch_event_persistent.delay(
event_name,
updated_event,
Expand Down
Loading

0 comments on commit 46ba27a

Please sign in to comment.