diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7166077..41abb1a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,6 +16,8 @@ concurrency: permissions: pages: write contents: read + packages: write + attestations: write id-token: write jobs: @@ -43,6 +45,36 @@ jobs: with: password: ${{ secrets.PYPI_API_TOKEN }} + - name: Authenticate with the container registry + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract container metadata (tags, labels) + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push container image + uses: docker/build-push-action@v3 + with: + context: . + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + build-args: | + VERSION=${{ github.ref_name }} + + - name: Generate artifact attestation + uses: actions/attest-build-provenance@v1 + with: + subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + subject-digest: ${{ steps.push.outputs.digest }} + push-to-registry: true + # Finally, generate and publish documentation after a successful release. documentation: needs: release diff --git a/.vscode/settings.json b/.vscode/settings.json index 317dfb2..6d4f3b1 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,4 @@ { - "python.linting.enabled": true, - "python.formatting.provider": "none", "[python]": { "editor.formatOnSave": true, "editor.codeActionsOnSave": { diff --git a/Dockerfile b/Dockerfile index 5b8e4e3..709cf45 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,15 @@ FROM python:3.12-slim +# Allow build-time specification of version. +ARG VERSION + +# Keep things friendly. +LABEL org.opencontainers.image.title="Grove" +LABEL org.opencontainers.image.description="A Software as a Service (SaaS) log collection framework." +LABEL org.opencontainers.image.url="https://github.com/hashicorp-forge/grove" +LABEL org.opencontainers.image.version=$VERSION + # Copy in Grove ready for installation. WORKDIR /tmp/grove COPY grove grove/ diff --git a/README.md b/README.md index 0770ef7..e0a7dab 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ Currently the following log sources are supported by Grove out of the box. If a isn't listed here, support can be added by creating a custom connector! * Atlassian audit events (e.g. Confluence, Jira) +* FleetDM host logs * GitHub audit logs * GSuite alerts * GSuite activity logs diff --git a/docs/configuration.rst b/docs/configuration.rst index e589760..5acaa2a 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -102,6 +102,29 @@ used to assist with encoding (:code:`encoding`), disabling a connector Please see the :meth:`grove.models.ConnectorConfig` implementation for more details. +Optional Fields +^^^^^^^^^^^^^^^ + +As specified in :meth:`grove.models.ConnectorConfig`, the following fields are optional. +This is not an exhaustive list, but only covers important fields which are noteworthy. + + * :code:`frequency` + + * The frequency on which a connector should be executed, in seconds. + * If not specified, this defaults to 600 seconds. + + * :code:`processors` + + * Defines a list of processors which should be run. + * Processors enable transformation of collected log records prior to output. + * See the :doc:`processors` section of the documentation for more information. + +.. Warning:: + :code:`frequency` is still adhered to when running in scheduled mode. If Grove is + executed more frequently than the specified :code:`frequency`, it will not execute + until enough time has passed since the last execution. This is done to ensure + consistency between daemon mode, and scheduled mode. + .. _secrets: Secrets diff --git a/docs/index.rst b/docs/index.rst index 6e6b910..5333f50 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -83,7 +83,9 @@ isn't listed here, support can be added by creating a custom connector! :caption: Getting Started quickstart + scheduling configuration + processors examples faq diff --git a/docs/processors.rst b/docs/processors.rst new file mode 100644 index 0000000..9c7d059 --- /dev/null +++ b/docs/processors.rst @@ -0,0 +1,185 @@ +Processors +=========== + +Processors provide an optional facility to allow transformation of collected log records +before output. Processors are defined as part of a connector configuration document, and +are able to be chained together in order to perform a particular set of operations in +sequence. + +A full list of available processors can be found in the submodules section of the +:meth:`grove.processors` documentation. + +.. note:: + Custom processors can be created in the same way as plugins. This can assist when + performing specific processing of log records not already supported by the built-in + Grove processors. For more information, please see the :doc:`internals` section of + this documentation. + +Configuration +^^^^^^^^^^^^^ + +Processors are configured in the `processors` list inside of a connector configuration +document. This list should contain each processor which is required to be run, in the +desired order. + +Each processor requires, at a minimum, that a :code:`name` and a :code:`processor` field +are defined. However, each processor have their own set of configuration fields which +are used to define how the processor should operate on a log record. + +To understand exactly which processor requires which fields, please refer to the +relevant :meth:`grove.processors` documentation. + + +Example +^^^^^^^ + +As an example of using processors together in order to transform collected log records, +the following example flattens Google Workspace activity logs, and ensures that there is +only one event per log record: + +.. code-block:: json + + "processors": [ + { + "name": "One event per log entry", + "processor": "split_path", + "source": "events" + }, + { + "name": "Flatten and zip event parameters", + "processor": "zip_paths", + "source": "events.parameters", + "key": "name", + "values": [ + "value", + "intValue", + "boolValue", + "multiValue", + "multiIntValue", + "multiBoolValue" + ] + } + ] + +In this example, two processors are in use: `split_path`, and `zip_paths`. + +In order to demonstrate the operations that these processors have on a log record, the +following section provides sample log records before and after processing by a given +processor. + +split_path +~~~~~~~~~ + +Split path is useful for upstream services which aggregate multiple events into a single +log record. In these cases, a single log record returned by a service may have multiple +events within it - rather than event one per log record. This can result in complexity +when attempting to parse and index these records in downstream log platforms. + +In order to handle this, the :code:`split_path` processor generates new log records for +each event, cloning the rest of the log record. As an example, the :code:`split_path` +processor configuration defined in the section above when working on the following log +record: + +.. code-block:: json + + { + "id": "00001", + "events": [ + { + "operation": "create", + "parameters": [ + {"name": "username", "value": "example"}, + {"name": "ip", "value": "192.0.2.1"} + ] + }, + { + "operation": "update", + "parameters": [ + {"name": "username", "value": "example"}, + {"name": "ip", "value": "192.0.2.1"} + ] + } + ] + } + +Would instead be output as two log records with the following structure: + +.. code-block:: json + + { + "id": "00001", + "events": { + "operation": "create", + "parameters": [ + {"name": "username", "value": "example"}, + {"name": "ip", "value": "192.0.2.1"} + ] + } + }, + { + "id": "00001", + "events": { + "operation": "update", + "parameters": [ + {"name": "username", "value": "example"}, + {"name": "ip", "value": "192.0.2.1"} + ] + } + } + +zip_paths +~~~~~~~~~ + +Continuing from the example configuration and log record above, Zip Paths can be used to +extract "generic" key / value pairs back into fields with their respective names. + +As an example, the :code:`zip_paths` processor configuration defined in the section +above when working on the log records output from the :code:`spit_path` example above: + +.. code-block:: json + + { + "id": "00001", + "events": { + "operation": "create", + "parameters": [ + {"name": "username", "value": "example"}, + {"name": "ip", "value": "192.0.2.1"} + ] + } + }, + { + "id": "00001", + "events": { + "operation": "update", + "parameters": [ + {"name": "username", "value": "example"}, + {"name": "ip", "value": "192.0.2.1"} + ] + } + } + +Would output the following log records: + +.. code-block:: json + + { + "id": "00001", + "events": { + "operation": "create", + "parameters": { + "username": "example", + "ip": "192.0.2.1" + } + } + }, + { + "id": "00001", + "events": { + "operation": "update", + "parameters": { + "username": "example", + "ip": "192.0.2.1" + } + } + } diff --git a/docs/scheduling.rst b/docs/scheduling.rst new file mode 100644 index 0000000..9fc6e83 --- /dev/null +++ b/docs/scheduling.rst @@ -0,0 +1,56 @@ +Scheduling +========== + +In most cases, for Grove to be effective, it must be configured to run on a particular +interval to ensure that new logs from configured sources are collected. This periodic +collection is enabled by one of Grove's runtime modes. The two modes currently provided +by Grove are: + +1. Scheduled mode. + * This is run by using the :code:`grove` command. + * This executes all configured connectors once, and then exits. + * This mode is intended to be used conjunction with an external scheduler, or to + allow a single point-in-time collection of logs for investigation and incident + response. +2. Daemon mode. + * This is run using the :code:`groved` command. + * This is a long running process which periodically executes all configured + connectors at their configured :code:`frequency`. + * This mode is intended to be run as a system service, or in a container runtime. + +Scheduled mode +------------- + +"Scheduled" mode is executed using the :code:`grove` command. + +Scheduled mode has no mode specific configuration option(s) which affect its runtime. + +Daemon mode +----------- + +Daemon mode is executed using the :code:`groved` command - rather than :code:`grove`. + +In Daemon mode, Grove runs as a long-running process which executes connectors on their +configured frequency. This enables connectors to run until completion with no deadlines, +and allows each connector to be executed at a different frequency - which may be +important for certain types of connector which need to collect data more frequently than +others. + +In daemon mode Grove has one important mode specific configuration option. As usual, +this is configurable using an environment variable using the same name. + +* :code:`GROVE_CONFIG_REFRESH` + * This option controls how frequently Grove will refresh these connector + configuration documents from the configured backend. + * Grove keeps a copy of all connector configuration documents in memory to prevent + querying the configuration backend constantly in the event loop. + * This allows connector configuration documents to be added, removed, and modified + without the need to restart Grove. + * This option defaults to 300 seconds. + +.. Note:: + It is important to note that while connector configuration documents are kept in + memory and periodically refreshed, secrets are fetched every time a connector is + executed - if a secrets backend is also in use. This is done to enable the use of + dynamic secrets engines, if supported by the configured secrets backend, and to + allow for secrets to be rotated without Grove needing to be notified or updated. diff --git a/grove/__about__.py b/grove/__about__.py index 5aa4356..3e5c19f 100644 --- a/grove/__about__.py +++ b/grove/__about__.py @@ -1,6 +1,6 @@ """Grove metadata.""" -__version__ = "1.6.0" +__version__ = "2.0.0" __title__ = "grove" __license__ = "Mozilla Public License 2.0" -__copyright__ = "Copyright 2023 HashiCorp, Inc." +__copyright__ = "Copyright 2025 HashiCorp, Inc." diff --git a/grove/connectors/__init__.py b/grove/connectors/__init__.py index 3f3744c..fff6597 100644 --- a/grove/connectors/__init__.py +++ b/grove/connectors/__init__.py @@ -15,6 +15,7 @@ from grove.__about__ import __version__ from grove.constants import ( + CACHE_KEY_LAST, CACHE_KEY_LOCK, CACHE_KEY_POINTER, CACHE_KEY_POINTER_NEXT, @@ -65,6 +66,10 @@ def __init__(self, config: ConnectorConfig, context: Dict[str, str]): self.configuration = config self.runtime_context = context + # Track the time our execution started. We use this quite a lot to ensure we + # are using a consistent timestamp between executions. + self._started = datetime.datetime.now(datetime.timezone.utc) + # 'Operations' are useful for APIs which have MANY different types of data which # can be returned but where only a small subset are pertinent. In this case, the # operation can be set in the configuration to instruct grove to only collect @@ -73,6 +78,7 @@ def __init__(self, config: ConnectorConfig, context: Dict[str, str]): self.kind = self.__class__.__module__ self.identity = self.configuration.identity self.operation = self.configuration.operation + self.frequency = self.configuration.frequency self.name = self.configuration.name # Define contextual log data to be appended to all log messages. @@ -149,6 +155,48 @@ def __init__(self, config: ConnectorConfig, context: Dict[str, str]): self._pointer_next = str() self._pointer_previous = str() + def due(self) -> bool: + """Checks whether a collection is (over)due. + + :return: True if a run is due, False if not required. + """ + # First check that an frequency is set. In daemon mode, an frequency is always + # required - as otherwise, we don't know how frequently to run the connector. + try: + _ = int(self.configuration.frequency) + except (ValueError, TypeError) as err: + raise ConfigurationException( + f"Connector '{self.kind}' has an invalid frequency set. {err}" + ) + + # If no last run is set, then always run. + try: + last = self.last + except NotFoundException: + self.logger.debug( + f"Connector '{self.kind}' does not have a last run time set.", + extra=self.log_context, + ) + return True + + # Check if the frequency between last run and now has passed. + delta = (self._started - last).seconds + self.logger.debug( + f"Connector '{self.kind}' last ran {delta} seconds ago, run frequency is " + f"{self.frequency} seconds.", + extra=self.log_context, + ) + + if delta >= self.frequency: + self.logger.info( + f"Connector '{self.kind}' is due to run, dispatching.", + extra=self.log_context, + ) + return True + + # Default to run not required. + return False + def run(self): """Connector entrypoint, called by the scheduler. @@ -173,6 +221,7 @@ def run(self): f"Connector '{self.kind}' could not complete collection successfully.", extra={"exception": err, **self.log_context}, ) + self.last = self._started self.unlock() return @@ -184,6 +233,7 @@ def run(self): self._run_reverse_chronological() # TODO: The use of a context manager for lock management would be best. + self.last = self._started self.unlock() def _run_chronological(self): @@ -517,18 +567,10 @@ def cache_key(self, prefix: str = CACHE_KEY_POINTER) -> str: :return: The constructed cache key. """ - # MD5 may not be cryptographically secure, but it works for our purposes. It's: - # - # 1) Short. - # 2) Has a low chance of (non-deliberate) collisions. - # 3) Able to be 'stringified' as hex, the character set of which is compatible - # with backends like DynamoDB. - # return ".".join( [ prefix, - self.CONNECTOR, - hashlib.md5(bytes(self.identity, "utf-8")).hexdigest(), # noqa: S324 + self.configuration.reference(), ] ) @@ -755,6 +797,40 @@ def finalize(self): }, ) + @property + def last(self) -> datetime.datetime: + """Returns the time of the last collection. + + This will return a datetime object which may be from a failure or a successful + collection. If a collection fails, we will still wait until the next collection + is due before trying again. + + :return: A datetime object representing the last collection. + """ + # Intentionally allow exceptions to bubble up so the caller can catch if the + # value is not in the cache. + value = datetime.datetime.strptime( + self._cache.get(self.cache_key(CACHE_KEY_LAST), self.operation), + DATESTAMP_FORMAT, + ) + + # Ensure there is always a timezone present on the parsed datetime object. + value = value.replace(tzinfo=datetime.timezone.utc) + + return value + + @last.setter + def last(self, value: datetime.datetime): + """Sets and saves the last collection time in cache. + + :param value: The value to save as the last collection time. + """ + self._cache.set( + self.cache_key(CACHE_KEY_LAST), + self.operation, + value.strftime(DATESTAMP_FORMAT), + ) + @property def hashes(self) -> Dict[str, set[str]]: """Return hashes for the most recently seen log entries. @@ -952,7 +1028,9 @@ def window_end(self, value: str): def save_window_end(self): """Saves the window end location to cache.""" self._cache.set( - self.cache_key(CACHE_KEY_WINDOW_END), self.operation, self.window_end + self.cache_key(CACHE_KEY_WINDOW_END), + self.operation, + self.window_end, ) def lock(self): @@ -963,8 +1041,7 @@ def lock(self): :raises ConcurrencyException: A valid lock is already held, likely the result of a concurrent execution of Grove. """ - now = datetime.datetime.utcnow() - expiry = now + datetime.timedelta(seconds=self._lock_duration) + expiry = self._started + datetime.timedelta(seconds=self._lock_duration) # If we don't have a lock, acquire one. current = self._lock_expiry @@ -975,11 +1052,14 @@ def lock(self): self._cache.get(self.cache_key(CACHE_KEY_LOCK), self.operation), LOCK_DATE_FORMAT, ) + + # Ensure there is always a timezone present on the parsed datetime. + current = current.replace(tzinfo=datetime.timezone.utc) except NotFoundException: pass # Someone else has the lock. - if current is not None and current >= now: + if current is not None and current >= self._started: raise ConcurrencyException( f"Valid lock already held and does not expire until {current}" ) @@ -1030,9 +1110,13 @@ def unlock(self): self._cache.get(self.cache_key(CACHE_KEY_LOCK), self.operation), LOCK_DATE_FORMAT, ) + except NotFoundException: return + # Ensure there is always a timezone present on the parsed datetime object. + current = current.replace(tzinfo=datetime.timezone.utc) + # Check if the lock matches what we expect. if current != self._lock_expiry: raise ConcurrencyException( @@ -1056,4 +1140,3 @@ def unlock(self): # Bye-bye lock. self._lock_expiry = None - self._lock_expiry = None diff --git a/grove/connectors/tines/audit_logs.py b/grove/connectors/tines/audit_logs.py index aea3655..079a90a 100644 --- a/grove/connectors/tines/audit_logs.py +++ b/grove/connectors/tines/audit_logs.py @@ -7,7 +7,7 @@ from grove.connectors import BaseConnector from grove.connectors.tines.api import Client -from grove.constants import OPERATION_DEFAULT, REVERSE_CHRONOLOGICAL +from grove.constants import DEFAULT_OPERATION, REVERSE_CHRONOLOGICAL from grove.exceptions import NotFoundException @@ -65,7 +65,7 @@ def collect(self): # Grove default is 'all'. operation = None - if self.operation != OPERATION_DEFAULT: + if self.operation != DEFAULT_OPERATION: operation = self.operation # Page over data using the cursor, saving returned data page by page. diff --git a/grove/constants.py b/grove/constants.py index 9d56dbe..656d360 100644 --- a/grove/constants.py +++ b/grove/constants.py @@ -4,6 +4,7 @@ """Constants used throughout Grove.""" # The prefix for all pointers in the cache. +CACHE_KEY_LAST = "last_run" CACHE_KEY_LOCK = "execution_lock" CACHE_KEY_SEEN = "deduplication" CACHE_KEY_POINTER = "pointer" @@ -14,9 +15,6 @@ CACHE_KEY_WINDOW_START = "window_start" CACHE_KEY_WINDOW_END = "window_end" -# The default operation name to use where none is specified. -OPERATION_DEFAULT = "all" - # The common datestamp format to use for all date operations. DATESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ" LOCK_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" @@ -36,6 +34,7 @@ ENV_GROVE_TELEMETRY_URI = "GROVE_TELEMETRY_URI" ENV_GROVE_WORKER_COUNT = "GROVE_WORKER_COUNT" ENV_GROVE_LOCK_DURATION = "GROVE_LOCK_DURATION" +ENV_GROVE_CONFIG_REFRESH = "GROVE_CONFIG_REFRESH" # Plugin groups (setuptools entrypoints). PLUGIN_GROUP_CACHE = "grove.caches" @@ -53,3 +52,12 @@ # Maximum number of connectors to execute concurrently. DEFAULT_WORKER_COUNT = 50 DEFAULT_LOCK_DURATION = 300 # seconds. + +# The default operation name to use where none is specified. +DEFAULT_OPERATION = "all" + +# The default interval, in seconds, to refresh connector configuration from the backend. +DEFAULT_CONFIG_REFRESH = 300 # seconds. + +# Run connectors every 10 minutes by default. +DEFAULT_CONFIG_FREQUENCY = 600 # seconds. diff --git a/grove/entrypoints/base.py b/grove/entrypoints/base.py index 67212fd..c4a7e3c 100644 --- a/grove/entrypoints/base.py +++ b/grove/entrypoints/base.py @@ -3,6 +3,7 @@ """Provides functions used between entrypoints.""" +import datetime import os import sys from concurrent.futures import ThreadPoolExecutor, as_completed @@ -26,7 +27,7 @@ from grove.models import ConnectorConfig -def dispatch(config: ConnectorConfig, context: Dict[str, str]): +def dispatch(config: ConnectorConfig, context: Dict[str, str]) -> datetime.datetime: """Executes a connector, blocking until complete. This function is intended to be called via a ThreadPoolExecutor to enable concurrent @@ -34,10 +35,16 @@ def dispatch(config: ConnectorConfig, context: Dict[str, str]): :param config: A connector configuration object for this connector thread. :param context: Contextual information relating to the current runtime. + + :return: The time the connector was scheduled. """ handler = plugin.lookup_handler(config.connector, PLUGIN_GROUP_CONNECTOR).load() instance = handler(config, context) - instance.run() + + if instance.due(): + instance.run() + + return instance.last def configure() -> List[ConnectorConfig]: @@ -123,7 +130,7 @@ def entrypoint(context: Dict[str, Any]): # Last ditch effort to catch any unhandled exceptions to ensure that they're # logged out. try: - future.result() + _ = future.result() except GroveException as err: logger.error( "Connector exited abnormally.", diff --git a/grove/entrypoints/local_daemon.py b/grove/entrypoints/local_daemon.py new file mode 100644 index 0000000..ed5d5fc --- /dev/null +++ b/grove/entrypoints/local_daemon.py @@ -0,0 +1,185 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Grove local daemon entrypoint.""" + +import datetime +import os +import socket +import sys +import time +from concurrent.futures import ThreadPoolExecutor +from typing import Dict + +from aws_lambda_powertools import Logger + +from grove.constants import ( + DEFAULT_CONFIG_REFRESH, + DEFAULT_WORKER_COUNT, + ENV_GROVE_CONFIG_REFRESH, + ENV_GROVE_WORKER_COUNT, +) +from grove.entrypoints import base +from grove.exceptions import GroveException +from grove.logging import GroveFormatter +from grove.models import Run + + +def runtime_information() -> Dict[str, str]: + """Attempts to determine the runtime, returning the relevant runtime data. + + :return: A dictionary of runtime data. + """ + # If Nomad, grab the relevant information. + if os.environ.get("NOMAD_ALLOC_ID", None): + return { + "runtime_id": os.environ.get("NOMAD_ALLOC_ID", "NOT_FOUND"), + "runtime_region": os.environ.get("NOMAD_REGION", "NOT_FOUND"), + "runtime_job_name": os.environ.get("NOMAD_JOB_NAME", "NOT_FOUND"), + } + + # If nothing else matched, assume a local process. + return { + "runtime_id": str(os.getpid()), + "runtime_host": socket.gethostname(), + } + + +def entrypoint(): + """Provides the daemon entrypoint for Grove. + + This does not use the base entrypoint, as Grove in daemon mode operates slightly + differently - as it needs to track the run state of connectors, and schedule new + executions based on their last run-time. + + It may be possible to rationalise the two in future, but for now, we'll keep + things separate. + """ + context = {"runtime": __file__, **runtime_information()} + logger = Logger( + "grove", + logger_formatter=GroveFormatter(context), + stream=sys.stderr, + ) + logger.info("Grove started") + + # Get the configuration refresh frequency. + try: + refreshed_at = None + since_refresh = None + refresh_frequency = int( + os.environ.get(ENV_GROVE_CONFIG_REFRESH, DEFAULT_CONFIG_REFRESH) + ) + logger.info( + f"Configuration will be reloaded every {refresh_frequency} seconds." + ) + except ValueError as err: + logger.critical( + f"Configuration refresh ('{ENV_GROVE_CONFIG_REFRESH}') must be a number.", + extra={"exception": err}, + ) + return + + # Allow users to optionally configure the number of worker threads. + try: + workers = int(os.environ.get(ENV_GROVE_WORKER_COUNT, DEFAULT_WORKER_COUNT)) + except ValueError as err: + logger.critical( + f"Worker count ('{ENV_GROVE_WORKER_COUNT}') must be a number.", + extra={"exception": err}, + ) + return + + # All connectors will be executed in their own thread, up to the maximum configured + # workers specified by the worker count. + logger.info("Spawning thread pool for connectors", extra={"workers": workers}) + + with ThreadPoolExecutor(max_workers=workers) as pool: + runs: Dict[str, Run] = {} + + while True: + if refreshed_at: + since_refresh = datetime.datetime.now() - refreshed_at # type:ignore + + # (Re)load the configuration from the configured backend if required. + if not refreshed_at or since_refresh.seconds >= refresh_frequency: # type: ignore + try: + configurations = base.configure() + refreshed_at = datetime.datetime.now() + + logger.info("Configuration has been refreshed from the backend.") + except GroveException as err: + # On failure to refresh, we could continue to run until the next + # refresh is due in order to try and be fault tolerant. For now + # though, if we fail to refresh we'll bail as the run-time should + # reschedule us. + logger.critical( + "Failed to load configuration from backend", + extra={"exception": err}, + ) + return + + # On the first run of a execution we check if a run is due - which requires + # a round-trip to the cache backend. For subsequent runs we check if we have + # a local last dispatch time to try and avoid hitting the cache every time. + for configuration in configurations: + now = datetime.datetime.now(datetime.timezone.utc) + ref = configuration.reference(suffix=configuration.operation) + run = runs.get(ref, Run(configuration=configuration)) + + # Use the frequency from the configuration, not the local object as it + # may have been changed in the configuration. + frequency = configuration.frequency + + if run.last is None or (now - run.last).seconds >= frequency: + # If there's a valid future on the local run object a run is still + # in progress. + if run.future is not None: + continue + + # Otherwise, schedule it and track the run. If the connector isn't + # due to run, if it has run more recently in another location, then + # the local 'last' time will be replaced with the cached value when + # the future returns. + future = pool.submit(base.dispatch, configuration, context) + run.last = now + run.future = future + runs[ref] = run + + # TODO: Run objects for connectors which have their configuration documents + # deleted aren't actually removed from the runs dictionary. These won't run + # again, but should be cleaned up if removed from the configuration backend. + + # Check the status of all futures. + for ref, run in runs.items(): + try: + if run.future is None or run.future.running(): + continue + + run.last = run.future.result() + run.future = None + except GroveException as err: + logger.error( + "Connector exited abnormally.", + extra={ + "exception": err, + "configuration": run.configuration.name, + "connector": run.configuration.connector, + }, + ) + + logger.info( + "Connector has exited.", + extra={ + "configuration": run.configuration.name, + "connector": run.configuration.connector, + }, + ) + + # Yield between iterations. + time.sleep(1) + + +# Support local development if called as a script. +if __name__ == "__main__": + entrypoint() diff --git a/grove/models.py b/grove/models.py index b029c87..dccf889 100644 --- a/grove/models.py +++ b/grove/models.py @@ -5,12 +5,17 @@ import base64 import binascii +import datetime +import hashlib from enum import Enum -from typing import Dict, List +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Extra, Field, root_validator, validator -from grove.constants import OPERATION_DEFAULT +from grove.constants import ( + DEFAULT_CONFIG_FREQUENCY, + DEFAULT_OPERATION, +) from grove.exceptions import DataFormatException @@ -100,7 +105,10 @@ class ConnectorConfig(BaseModel, extra=Extra.allow): # Operations allow connectors and users to filter which 'type' of events to collect # from API endpoints which allow filtering records to return. - operation: str = Field(OPERATION_DEFAULT) + operation: str = Field(DEFAULT_OPERATION) + + # Frequency to execute connector if not explicitly configured. + frequency: int = Field(DEFAULT_CONFIG_FREQUENCY) # Processors allow processing of data during collection. processors: List[ProcessorConfig] = Field([]) @@ -115,6 +123,32 @@ class ConnectorConfig(BaseModel, extra=Extra.allow): } ) + def reference(self, suffix: Optional[str] = None) -> str: + """Attempt to generate a unique reference for this connector instance. + + This is used during creation of cache keys, and other values which should be + unique per connector instance. + + :param suffix: An optional suffix to append to the end of the reference. This is + is useful for handling other configuration data to the reference, such as + the operation. + """ + # MD5 may not be cryptographically secure, but it works for our purposes. It's: + # + # 1) Short. + # 2) Has a low chance of (non-deliberate) collisions. + # 3) Able to be 'stringified' as hex, the character set of which is compatible + # with backends like DynamoDB. + # + parts = [ + self.connector, + hashlib.md5(bytes(self.identity, "utf-8")).hexdigest(), + ] + if suffix is not None: + parts.append(suffix) + + return ".".join(parts) + @validator("key") def _validate_key_or_secret(cls, value, values, field): # noqa: B902 """Ensures that 'key' is set directly or a reference is present in 'secrets'. @@ -166,3 +200,21 @@ def _decode_fields(cls, values): # noqa: B902 values[field] = decode(values.get(field), encoding) return values + + +class Run(BaseModel, extra=Extra.forbid): + """Defines a model for tracking dispatched / running connectors. + + This is used when running as a daemon, in order to allow local tracking of state + and to prevent the need to constantly hit the cache backend during the main + event loop. + """ + + # The future associated with the dispatched thread, or runtime element. + future: Optional[Any] = None + + # The connector configuration for this run. + configuration: ConnectorConfig + + # A date-time object representing the last time this was dispatched. + last: Optional[datetime.datetime] = None diff --git a/pyproject.toml b/pyproject.toml index bbc5a8b..798229e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,9 +56,11 @@ version = {attr = "grove.__about__.__version__"} [project.scripts] grove = "grove.entrypoints.local_process:entrypoint" +groved = "grove.entrypoints.local_daemon:entrypoint" [project.entry-points."grove.entrypoints"] aws_lambda = "grove.entrypoints.aws_lambda:entrypoint" +local_daemon = "grove.entrypoints.local_daemon:entrypoint" local_process = "grove.entrypoints.local_process:entrypoint" [project.entry-points."grove.connectors"] diff --git a/tests/test_connectors_base.py b/tests/test_connectors_base.py index 550d21d..a0fdec7 100644 --- a/tests/test_connectors_base.py +++ b/tests/test_connectors_base.py @@ -3,12 +3,13 @@ """Implements tests for the Base connector.""" +import datetime import unittest from unittest.mock import patch from grove.connectors import BaseConnector from grove.constants import REVERSE_CHRONOLOGICAL -from grove.exceptions import NotFoundException +from grove.exceptions import ConfigurationException, NotFoundException from grove.models import ConnectorConfig from tests import mocks @@ -95,6 +96,44 @@ def test_encoded_key(self): ) self.assertEqual(connector.key, "ABCDEF") + def test_due(self): + """Ensures the due method, used by the scheduler, is working as expected.""" + connector = BaseConnector( + config=ConnectorConfig( + key="test", + name="test", + identity="1FEEDFEED1", + frequency=100, + connector="example_one", + ), + context={ + "runtime": "test_harness", + "runtime_id": "NA", + }, + ) + + # Ensure we report a run is due if there is no 'last' in cache. + self.assertTrue(connector.due()) + + # Ensure we report a run is not required if run frequency has not passed. + now = datetime.datetime.now(datetime.timezone.utc) + connector.last = now + self.assertFalse(connector.due()) + + # Ensure we report a run is required if run frequency has passed. + connector.last = now - datetime.timedelta(seconds=100) + self.assertTrue(connector.due()) + + # Ensure a configuration exception is raised if no frequency is set. + connector.configuration.frequency = None + with self.assertRaises(ConfigurationException): + connector.due() + + # Ensure a configuration exception is raised if an invalid frequency is set. + connector.configuration.frequency = "a" + with self.assertRaises(ConfigurationException): + connector.due() + @patch("grove.helpers.plugin.load_handler", mocks.load_handler) def test_recover_from_incomplete(self): # Setup a connector which operates in reverse chronological mode. @@ -128,27 +167,27 @@ def test_recover_from_incomplete(self): # Setup the cache to match the described failure case. connector._cache.set( - pk="pointer_next.test.06dc0fd3c08a2bc6a33f5460da9fea10", + pk="pointer_next.example_one.06dc0fd3c08a2bc6a33f5460da9fea10", sk="all", value="2022-11-20T20:31:18.382Z", ) connector._cache.set( - pk="window_start.test.06dc0fd3c08a2bc6a33f5460da9fea10", + pk="window_start.example_one.06dc0fd3c08a2bc6a33f5460da9fea10", sk="all", value="2022-11-20T20:31:18.382Z", ) connector._cache.set( - pk="pointer_previous.test.06dc0fd3c08a2bc6a33f5460da9fea10", + pk="pointer_previous.example_one.06dc0fd3c08a2bc6a33f5460da9fea10", sk="all", value="2022-11-20T20:30:10.772Z", ) connector._cache.set( - pk="window_end.test.06dc0fd3c08a2bc6a33f5460da9fea10", + pk="window_end.example_one.06dc0fd3c08a2bc6a33f5460da9fea10", sk="all", value="2022-11-20T20:30:10.772Z", ) connector._cache.set( - pk="pointer.test.06dc0fd3c08a2bc6a33f5460da9fea10", + pk="pointer.example_one.06dc0fd3c08a2bc6a33f5460da9fea10", sk="all", value="2022-11-20T20:31:18.382Z", ) @@ -163,15 +202,15 @@ def test_recover_from_incomplete(self): # Clean-up to simulate a fresh run without having to re-create everything in # a subsequent test case. connector._cache.delete( - pk="window_start.test.06dc0fd3c08a2bc6a33f5460da9fea10", + pk="window_start.example_one.06dc0fd3c08a2bc6a33f5460da9fea10", sk="all", ) connector._cache.delete( - pk="window_end.test.06dc0fd3c08a2bc6a33f5460da9fea10", + pk="window_end.example_one.06dc0fd3c08a2bc6a33f5460da9fea10", sk="all", ) connector._cache.delete( - pk="pointer_next.test.06dc0fd3c08a2bc6a33f5460da9fea10", + pk="pointer_next.example_one.06dc0fd3c08a2bc6a33f5460da9fea10", sk="all", ) connector._window_end = "" diff --git a/tests/test_connectors_lock.py b/tests/test_connectors_lock.py index de03b91..82b15a8 100644 --- a/tests/test_connectors_lock.py +++ b/tests/test_connectors_lock.py @@ -4,6 +4,7 @@ """Implements tests for connector locking.""" import os +import tempfile import time import unittest from unittest.mock import patch @@ -18,6 +19,11 @@ class ConnectorLockingTestCase(unittest.TestCase): """Implements tests for connector locking.""" + def setUp(self): + """Ensure a local file cache handler is used for testing.""" + self.path = tempfile.TemporaryDirectory() + os.environ["GROVE_CACHE_LOCAL_FILE_PATH"] = self.path.name + @patch("grove.helpers.plugin.load_handler", mocks.load_handler) def test_lock_acquire(self): """Ensures locks can be acquired and concurrent executions are prevented.""" @@ -73,8 +79,8 @@ def test_lock_takeover(self): first_execution = BaseConnector(config=config, context=context) first_execution.lock() - # Attempt to acquire a lock for the same connector instance - after the lock - # has expired. This should succeed due to expiry. + # Attempt to acquire a lock while a lock is still held by the first execution. + # this should fail. second_execution = BaseConnector(config=config, context=context) # In-memory cache does not support locking as it's only intended for local @@ -91,8 +97,10 @@ def test_lock_takeover(self): # Wait to ensure the lock has expired. time.sleep(5) - # Ensure the lock is taken-over successfully after expiration. + # Now ensure that the lock is able to be taken-over after expiration. + second_execution = BaseConnector(config=config, context=context) second_execution.lock() + first_execution._cache._data = second_execution._cache._data # Ensure subsequent attempts of the first fail due to this lock takeover. with self.assertRaises(ConcurrencyException): @@ -135,10 +143,5 @@ def test_lock_unlock(self): with self.assertRaises(ConcurrencyException): second_execution.lock() - # Unlock the first connector, and then ensure the second calling lock does not - # error. First though, we need to clone the newly updated cache again. - # - # TODO: Remove the need for this, as it's going to cause confusion in future. first_execution.unlock() - second_execution._cache._data = first_execution._cache._data second_execution.lock()