diff --git a/docs/source/ceda.png b/docs/source/ceda.png new file mode 100644 index 00000000..5274a252 Binary files /dev/null and b/docs/source/ceda.png differ diff --git a/docs/source/conf.py b/docs/source/conf.py index 8e2e240f..8cbae625 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -54,4 +54,10 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] \ No newline at end of file +html_static_path = ['_static'] + +html_logo = "ceda.png" +html_theme_options = { + 'logo_only': True, + 'display_version': False, +} \ No newline at end of file diff --git a/docs/source/cta-emulator.rst b/docs/source/cta-emulator.rst new file mode 100644 index 00000000..0b855bff --- /dev/null +++ b/docs/source/cta-emulator.rst @@ -0,0 +1,187 @@ +CERN Tape Archive Set Up +======================== + +As part of the development of the NLDS, a tape emulator was set up to better +understand how to interact with ``xrootd`` and the CERN tape archive. The +following instructions detail how to set up such a tape emulator and are adapted +from the instructions on [the CTA repo] +(https://gitlab.cern.ch/cta/CTA/-/tree/main/continuousintegration/buildtree_runner) +and those provided by STFC's Scientific Computing Department at the Rutherford +Appleton Laboratory. There are two major parts: commissioning the virtual +machine, and setting it up appropriately according to the CERN instructions. + + +Commissioning the VM +-------------------- + +These instructions are specifically for commissioning a VM on the STFC openstack +cloud interface. For other machines, slightly different instructions will need +to be followed. + +After you have logged in to the openstack interface, you click on the "Launch +Instance" button on the top to create the VM and then: + +1. In the "Details" tab, give your VM a suitable name +2. In the "Source" tab, select scientificlinux-7-aq as a source image +3. In the "Flavour" tab, select a VM type depending on how many VCPUs, RAM and + disk size you need +4. In the "Networks" tab, select "Internal" +5. In the "Key Pair" tab, upload your public rsa ssh key so you can login to the + VM once it is created. +6. In the "Metadata" tab, click on the pull-down menu "Aquilon Image Properties" + and then set it to "Aquilon Archetype", specifying ``ral-tier1``, and also + "Aquilon Personality" specifying it as ``eoscta_ci_cd``. Note that you will + have to manually write these values out so it's worth copy pasting to avoid + typos! +7. Press the "Launch Instance" button and the VM will be created. Give it some + time so that quattor runs - quattor being a vm management tool like Puppet. + It may also need a reboot at some point. + +This setup produces a vm which requires logging in as your openstack username - +in most cases this will be your STFC federal ID. You will be able to sudo +assuming the machine remains in the correct configuration. + +**Note:** +The above setup is one that theoretically works. However, the machine which – +after some attempts – successfully had CTA installed on it had to be +commissioned manually by SCD so that + +(a) quattor could be made sure to have run successfully and then subsequently + disabled +(b) I would be able to log in as ``root`` thus negating the need to edit the + sudoers file after each reboot. + +I would strongly recommend this approach if SCD are agreeable to commissioning +your vm for you. + + +Setting up CTA on the VM +------------------------ + +The following are the working set of instructions at time of writing, provided +by SCD at RAL. + +* **Ensure quattor is not running** + + There should be an empty file at ``/etc/noquattor``, if there is not one then + create it with + + ``sudo touch /etc/noquattor`` + +* **Clone the CTA gitlab repo** + + ``git clone https://gitlab.cern.ch/cta/CTA.git`` + +* **User environment** + + As per instructions + + ``cd ./CTA/continuousintegration/buildtree_runner/vmBootstrap`` + + BUT in bootstrapSystem.sh, delete/comment line 46 and then + + ``./bootstrapSystem.sh`` + + When prompted for password, press return (i.e don't give one). This creates + the ``cta`` user and adds them to sudoers + + +* **CTA build tree** + + As per instructions + + ``su - cta`` + ``cd ~/CTA/continuousintegration/buildtree_runner/vmBootstrap`` + + BUT edit lines 54,55 in bootstrapCTA.sh to look like + + ``sudo wget https://public-yum.oracle.com/RPM-GPG-KEY-oracle-ol7 -O /etc/pki/rpm-gpg/RPM-GPG-KEY-oracle --no-check-certificate`` + ``sudo wget https://download.ceph.com/keys/release.asc -O /etc/pki/rpm-gpg/RPM-ASC-KEY-ceph --no-check-certificate`` + + Note the change in the URL on line 55 from ``git.ceph.com`` to + ``download.ceph.com``, as well as the addition of the ``--no-check-certificate`` + flag. + + Then run bootstrapCTA.sh (without any args) + + ``./bootstrapCTA.sh`` + + +* **Install MHVTL** + + As per instructions + + ``cd ~/CTA/continuousintegration/buildtree_runner/vmBootstrap`` + ``./bootstrapMHVTL.sh`` + +* **Kubernetes setup** + + As per instructions + + ``cd ~/CTA/continuousintegration/buildtree_runner/vmBootstrap`` + ``./bootstrapKubernetes.sh`` + + and reboot host + + ``sudo reboot`` + +* **Docker image** + + Depending on how your machine was set up you may now need to ensure that + quattor is still disabled (i.e. that the ``/etc/noquattor`` file still exists) + and that the cta user is still in the sudoers file. This will not be necessary + if you are running as ``root``. + + Then, as per instructions + + ``su - cta`` + ``cd ~/CTA/continuousintegration/buildtree_runner`` + + BUT edit lines 38,39 in /home/cta/CTA/continuousintegration/docker/ctafrontend/cc7/buildtree-stage1-rpms-public/Dockerfile to look like + + ``RUN wget https://public-yum.oracle.com/RPM-GPG-KEY-oracle-ol7 -O /etc/pki/rpm-gpg/RPM-GPG-KEY-oracle --no-check-certificate`` + ``RUN wget https://download.ceph.com/keys/release.asc -O /etc/pki/rpm-gpg/RPM-ASC-KEY-ceph --no-check-certificate`` + + then run the master script to prepare all the Docker images. + + ``./prepareImage.sh`` + +* **Preparing the environment (MHVTL, kubernetes volumes...)** + + As per instructions + + ``cd ~/CTA/continuousintegration/buildtree_runner`` + ``sudo ./recreate_buildtree_running_environment.sh`` + +* **Preparing the CTA instance** + + As per instructions + + ``cd ~/CTA/continuousintegration/orchestration`` + ``sudo ./create_instance.sh -n cta -b ~ -B CTA-build -O -D -d internal_postgres.yaml`` + + This may work first time but it never did for me, so the fix is to then run + + ``./delete_instance.sh -n cta`` + + To remove the instance and then re-create it with the same command as above + ``sudo ./create_instance.sh -n cta -b ~ -B CTA-build -O -D -d internal_postgres.yaml`` + + This can be verified to be working with a call to + + ``kubectl -n cta get pods`` + + which should return a list of the working pods, looking something like: + + ============ ======== ======== ========= === + NAME READY STATUS RESTARTS AGE + ============ ======== ======== ========= === + client 1/1 Running 0 35m + ctacli 1/1 Running 0 35m + ctaeos 1/1 Running 0 35m + ctafrontend 1/1 Running 0 35m + kdc 1/1 Running 0 35m + postgres 1/1 Running 0 36m + tpsrv01 2/2 Running 0 35m + tpsrv02 2/2 Running 0 35m + ============ ======== ======== ========= === diff --git a/docs/source/index.rst b/docs/source/index.rst index c65e4142..ec4f2af3 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -8,14 +8,22 @@ Welcome to Near-line Data Store's documentation! .. toctree:: :maxdepth: 2 - :caption: Contents: + :caption: Contents - Getting started + Getting started Specification NLDS Server API NLDS Processors API +.. toctree:: + :maxdepth: 2 + :caption: Advanced + + The server config file + Server config examples + Setting up a CTA tape emulator + Indices and tables ================== diff --git a/docs/source/nlds-processors.rst b/docs/source/nlds-processors.rst index a2a982ad..d8a861f3 100644 --- a/docs/source/nlds-processors.rst +++ b/docs/source/nlds-processors.rst @@ -1,5 +1,7 @@ -Core content of the nlds-processors -=================================== +Microservices +============= + +Core content of the nlds-processors. The Consumer class ------------------ @@ -10,7 +12,7 @@ The Consumer class The processors -------------- -Also referred to as 'microservices' +Also referred to as 'microservices', 'consumers', or 'workers': .. automodule:: nlds_processors.nlds_worker :members: @@ -18,13 +20,34 @@ Also referred to as 'microservices' .. automodule:: nlds_processors.index :members: -.. automodule:: nlds_processors.transfer +.. automodule:: nlds_processors.transferers.base_transfer + :members: + +.. automodule:: nlds_processors.transferers.put_transfer + :members: + +.. automodule:: nlds_processors.transferers.get_transfer + :members: + +.. automodule:: nlds_processors.db_mixin + :members: + +.. automodule:: nlds_processors.catalog.catalog_models + :members: + +.. automodule:: nlds_processors.catalog.catalog + :members: + +.. automodule:: nlds_processors.catalog.catalog_worker + :members: + +.. automodule:: nlds_processors.monitor.monitor_models :members: -.. automodule:: nlds_processors.catalog +.. automodule:: nlds_processors.monitor.monitor :members: -.. automodule:: nlds_processors.monitor +.. automodule:: nlds_processors.monitor.monitor_worker :members: .. automodule:: nlds_processors.logger diff --git a/docs/source/nlds-server.rst b/docs/source/nlds-server.rst index 6cd59ab4..b9d7851b 100644 --- a/docs/source/nlds-server.rst +++ b/docs/source/nlds-server.rst @@ -1,5 +1,7 @@ -Core content of the nlds-server -=============================== +NLDS API-server +=============== + +The core content of the NLDS API-server run using FastAPI. The Publisher class ------------------- @@ -28,7 +30,7 @@ The authenticators :members: :undoc-members: -Authenicate methods also contains 3 general methods, used by the above 2 +Authenticate methods also contains 3 general methods, used by the above 2 modules, to validate the given user, group and token. diff --git a/docs/source/server-config/examples.rst b/docs/source/server-config/examples.rst new file mode 100644 index 00000000..62fc8130 --- /dev/null +++ b/docs/source/server-config/examples.rst @@ -0,0 +1,224 @@ + +Examples +======== + +Local NLDS +---------- + +What follows is an example server config file to use for a local, development +version of an NLDS system where all consumers are running concurrently on one +machine - likely a laptop or single vm. This file would be saved at +``/etc/server_config``:: + + { + "authentication" : { + "authenticator_backend" : "jasmin_authenticator", + "jasmin_authenticator" : { + "user_profile_url" : "[REDACTED]", + "user_services_url" : "[REDACTED]", + "oauth_token_introspect_url" : "[REDACTED]" + } + }, + "index_q":{ + "logging":{ + "enable": true + }, + "filelist_threshold": 10000, + "check_permissions_fl": true, + "use_pwd_gid_fl": true, + "retry_delays": [ + 0, + 1, + 2 + ] + }, + "nlds_q":{ + "logging":{ + "enable": true + } + }, + "transfer_put_q":{ + "logging":{ + "enable": true + }, + "tenancy": "example-tenancy.s3.uk", + "require_secure_fl": false, + "use_pwd_gid_fl": true, + "retry_delays": [ + 0, + 1, + 2 + ] + }, + "transfer_get_q":{ + "logging":{ + "enable": true + }, + "tenancy": "example-tenancy.s3.uk", + "require_secure_fl": false, + "use_pwd_gid_fl": true + }, + "monitor_q":{ + "db_engine": "sqlite", + "db_options": { + "db_name" : "//Users/jack.leland/nlds/nlds_monitor.db", + "db_user" : "", + "db_passwd" : "", + "echo": false + }, + "logging":{ + "enable": true + } + }, + "logging":{ + "log_level": "debug" + }, + "logging_q":{ + "logging":{ + "log_level": "debug", + "add_stdout_fl": false, + "stdout_log_level": "warning", + "log_files": [ + "logs/nlds_q.txt", + "logs/index_q.txt", + "logs/catalog_q.txt", + "logs/monitor_q.txt", + "logs/transfer_put_q.txt", + "logs/transfer_get_q.txt", + "logs/logging_q.txt", + "logs/api_server.txt" + ] + } + }, + "catalog_q":{ + "db_engine": "sqlite", + "db_options": { + "db_name" : "//Users/jack.leland/nlds/nlds_catalog.db", + "db_user" : "", + "db_passwd" : "", + "echo": false + }, + "retry_delays": [ + 0, + 1, + 2 + ], + "logging":{ + "enable": true + } + }, + "rabbitMQ": { + "user": "full_access", + "password": "passwordletmein123", + "server": "130.246.3.98", + "vhost": "delayed-test", + "exchange": { + "name": "test_exchange", + "type": "topic", + "delayed": true + }, + "queues": [ + { + "name": "nlds_q", + "bindings": [ + { + "exchange": "test_exchange", + "routing_key": "nlds-api.route.*" + }, + { + "exchange": "test_exchange", + "routing_key": "nlds-api.*.complete" + }, + { + "exchange": "test_exchange", + "routing_key": "nlds-api.*.failed" + } + ] + }, + { + "name": "monitor_q", + "bindings": [ + { + "exchange": "test_exchange", + "routing_key": "*.monitor-put.start" + }, + { + "exchange": "test_exchange", + "routing_key": "*.monitor-get.start" + } + ] + }, + { + "name": "index_q", + "bindings": [ + { + "exchange": "test_exchange", + "routing_key": "*.index.start" + }, + { + "exchange": "test_exchange", + "routing_key": "*.index.init" + } + ] + }, + { + "name": "catalog_q", + "bindings": [ + { + "exchange": "test_exchange", + "routing_key": "*.catalog-put.start" + }, + { + "exchange": "test_exchange", + "routing_key": "*.catalog-get.start" + }, + { + "exchange": "test_exchange", + "routing_key": "*.catalog-del.start" + } + ] + }, + { + "name": "transfer_put_q", + "bindings": [ + { + "exchange": "test_exchange", + "routing_key": "*.transfer-put.start" + } + ] + }, + { + "name": "transfer_get_q", + "bindings": [ + { + "exchange": "test_exchange", + "routing_key": "*.transfer-get.start" + } + ] + }, + { + "name": "logging_q", + "bindings": [ + { + "exchange": "test_exchange", + "routing_key": "*.log.*" + } + ] + } + ] + }, + "rpc_publisher": { + "queue_exclusivity_fl": true + } + } + +Note that this is purely an example and doesn't necessarily use all features +within the NLDS. For example, several individual consumers have ``retry_delays`` +set but not generic ``retry_delays`` is set in the ``general`` section. Note +also that the jasmin authenication configuration is redacted for security +purposes. + +Distributed NLDS +---------------- + +COMING SOON \ No newline at end of file diff --git a/docs/source/server-config/server-config.rst b/docs/source/server-config/server-config.rst new file mode 100644 index 00000000..f04ff033 --- /dev/null +++ b/docs/source/server-config/server-config.rst @@ -0,0 +1,352 @@ +Server config +============= + +The server config file controls the configurable behaviour of the NLDS. It is a +json file split into dictionary sections, with each section delineating +configuration for a specific part of the program. There is an example +server_config in the templates section of the main nlds package +(``nlds.templates.server_config``) to get you started, but this page will +demystify the configuration needed for (a) a local development copy of the nlds, +and (b) a production system spread across several pods/virtual machines. + +*Please note that the NLDS is in active development and all of this is subject +to change with no notice.* + +Required sections +----------------- + +There are two required sections for every server_config: ``authentication`` and +``rabbitMQ``. + +Authentication +^^^^^^^^^^^^^^ +This deals with how users are authenticated through the OAuth2 flow used in the +client. The following fields are required in the dictionary:: + + "authentication" : { + "authenticator_backend" : "jasmin_authenticator", + "jasmin_authenticator" : { + "user_profile_url" : "{{ user_profile_url }}", + "user_services_url" : "{{ user_services_url }}", + "oauth_token_introspect_url" : "{{ token_introspect_url }}" + } + } + +where ``authenticator_backend`` dictates which form of authentication you would +like to use. Currently the only implemented authenticator is the +``jasmin_authenticator``, but there are plans to expand this to also work with +other industry standard authenticators like google and microsoft. + +The authenticator setup is then specified in a separate dictionary named after +the authenticator, which is specific to each authenticator. The +``jasmin_authenticator`` requires, as above, values for ``user_profile_url``, +``user_services_url``, and ``oauth_token_introspect_url``. This cannot be +divulged publicly on github for JASMIN, so please get in contact for the actual +values to use. + +RabbitMQ +^^^^^^^^ + +This deals with how the nlds connects to the RabbitMQ queue and message +brokering system. The following is an outline of what is required:: + + "rabbitMQ": { + "user": "{{ rabbit_user }}", + "password": "{{ rabbit_password }}", + "server": "{{ rabbit_server }}", + "vhost": "{{ rabbit_vhost }}", + "exchange": { + "name": "{{ rabbit_exchange_name }}", + "type": "{{ rabbit_exchange_type }}", + "delayed": "{{ rabbit_exchange_delayed }}" + }, + "queues": [ + { + "name": "{{ rabbit_queue_name }}", + "bindings": [ + { + "exchange": "{{ rabbit_exchange_name }}", + "routing_key": "{{ rabbit_queue_routing_key }}" + } + ] + } + ] + } + +Here the ``user`` and ``password`` fields refer to the username and password for +the rabbit server you wish to connect to, which is in turn specified with +``server``. ``vhost`` is similarly the virtual host on the rabbit server that +you wish to connect to. + +The next two dictionaries are context specific. All publishing elements of the +NLDS, i.e. parts that will send messages, will require an exchange to publish +messages to. ``exchange`` is determines that exchange, with three required +subfields: ``name``, ``type``, and ``delayed``. The former two are self +descriptive, they should just be the name of the exchange on the virtualhost and +it's corresponding type e.g. one of fanout, direct or topic. ``delay`` is a +boolean (``true`` or ``false`` in json-speak) dictating whether to use the +delay functionality utilised within the NLDS. Note that this requires the rabbit +server have the DelayedRabbitExchange plugin installed. + +Exchanges can be declared and created if not present on the virtual host the +first time the NLDS is run, virtualhosts cannot and so will have to be created +beforehand manually on the server or through the admin interface. If an exchange +is requested but incorrect information given about either its `type` or +`delayed` status, then the NLDS will throw an error. + +``queues`` is a list of queue dictionaries and must be implemented on consumers, +i.e. message processors, to tell ``pika`` where to take messages from. Each +queue dictionary consists of a ``name`` and a list of `bindings`, with each +``binding`` being a dictionary containing the name of the ``exchange`` the queue +takes messages from, and the routing key that a message must have to be accepted +onto the queue. For more information on exchanges, routing keys, and other +RabbitMQ features, please see [Rabbit's excellent documentation] +(https://www.rabbitmq.com/tutorials/tutorial-five-python.html). + + +Generic optional sections +------------------------- + +There are 2 generic sections, i.e. those which are used across the NLDS +ecosystem, but are optional and therefore fall back on a default configuration +if not specified. These are ``logging``, and ``general``. + +Logging +^^^^^^^ + +The logging configuration options look like the following:: + + "logging": { + "enable": boolean + "log_level": str - ("none" | "debug" | "info" | "warning" | "error" | "critical"), + "log_format": str - see python logging docs for details, + "add_stdout_fl": boolean, + "stdout_log_level": str - ("none" | "debug" | "info" | "warning" | "error" | "critical"), + "log_files": List[str], + "rollover": str - see python logging docs for details + } + +These all set default options the native python logging system, with +``log_level`` being the log level, ``log_format`` being a string describing the +log output format, and rollover describing the frequency of rollover for log +files in the standard manner. For details on all of this, see the python docs +for inbuilt logging. ``enable`` and ``add_stdout_fl`` are boolean flags +controlling log output to files and ``stdout`` respectively, and the +``stdout_log_level`` is the log level for the stdout logging, if you require it +to be different from the default log level. + +``log_files`` is a list of strings describing the path or paths to log files +being written to. If no log files paths are given then no file logging will be +done. If active, the file logging will be done with a TimedRotatingFileHandler, +i.e. the files will be rotated on a rolling basis, with the rollover time +denoted by the ``rollover`` option, which is a time string similar to that found +in crontab. Please see the [python logging docs] +(https://docs.python.org/3/library/logging.handlers.html#logging.handlers.TimedRotatingFileHandler) +for more info on this. + +As stated, these all set the default log options for all publishers and +consumers within the NLDS - these can be overridden on a consumer-specific basis +by inserting a ``logging`` sub-dictionary into a consumer-specific optional +section. + +General +^^^^^^^ + +The general config, as of writing this page, only covers one option: the +retry_delays list:: + + "general": { + "retry_delays": List[int] + } + +This retry delays list gives the delay applied to retried messages in seconds, +with the `n`th element being the delay for the `n`th retry. Setting the value +here sets a default for _all_ consumers, but the retry_delays option can be +inserted into any consumer-specific config section to override this. + +Consumer-specific optional sections +----------------------------------- + +Each of the consumers have their own configuration dictionary, named by +convention as ``{consumername}_q``, e.g. ``transfer_put_q``. Each has a set of +default options and will accept both a logging dictionary and a retry_delays +list for consumer-specific override of the default options, mentioned above. +Each consumer also has a specific set of config options, some shared, which will +control its behaviour. The following is a brief rundown of the server config +options for each consumer. + +NLDS Worker +^^^^^^^^^^^ +The server config section is ``nlds_q``, and the following options are available:: + + "nlds_q": { + "logging": [standard_logging_dictionary], + "retry_delays": List[int] + "print_tracebacks_fl": boolean, + } + +Not much specifically happens in the NLDS worker that requires configuration, so +it basically just has the default settings. One that has not been covered yet, +``print_tracebacks_fl``, is a boolean flag to control whether the full +stacktrace of any caught exception is sent to the logger. This is a standard +across all consumers. You may set retry_delays if you wish but the NLDS worker +doesn't retry messages specifically, only in the case of something going +unexpectedly wrong. + +Indexer +^^^^^^^ + +Server config section is ``index_q``, and the following options are available:: + + "index_q": { + "logging": {standard_logging_dictionary}, + "retry_delays": List[int], + "print_tracebacks_fl": boolean, + "filelist_max_length": int, + "message_threshold": int, + "max_retries": int, + "check_permissions_fl": boolean, + "check_filesize_fl": boolean, + "use_pwd_gid_fl": boolean + } + +where ``logging``, ``retry_delays``, and ``print_tracebacks_fl`` are, as above, +standard configurables within the NLDS consumer ecosystem. +``filelist_maxlength`` determines the maximum length that any file-list provided +to the indexer consumer during the `init` (i.e. `split`) step can be. Any +transaction that is given initially with a list that is longer than this value +will be split down into many sub-transactions with this as a maximum length. For +example, with the default value of 1000, and a transaction with an initial list +size of 2500, will be split into 3 sub-transactions; 2 of them having a +list of 1000 files and the remaining 500 files being put into the third +sub-transaction. + +``message threshold`` is very similar in that it places a limit on the total +size of files within a given filelist. It is applied at the indexing +(`nlds.index`) step when files have actually been statted, and so will further +sub-divide any sub-transactions at that point if they are too large or are +revealed to contain lots of folders with files in upon indexing. ``max_retries`` +control the maximum number of times an entry in a filelist can be attempted to +be indexed, either because it doesn't exist or the user doesn't have the +appropriate permissions to access it at time of indexing. This feeds into retry +delays, as each subsequent time a sub-transaction is retried it will be delayed +by the amount specified at that index within the ``retry_delays`` list. If +``max_retries`` exceeds ``len(retry_delays)``, then any retries which don't have +an explicit retry delay to use will use the final element in the ``retry_delays`` +list. + +``check_permissions_fl`` and ``check_filesize_fl`` are commonly used boolean +flags to control whether the indexer checks the permissions and filesize of +files respectively during the indexing step. + +``use_pwd_gid_fl`` is a final boolean flag which controls how permissions +checking goes about getting the gid to check group permissions against. If True, +it will _just_ use the gid found in the ``pwd`` table on whichever machine the +indexer is running on. If false, then this gid is used `as well as` all of those +found using the ``os.groups`` command - which will read all groups found on the +machine the indexer is running on. + + +Cataloguer +^^^^^^^^^^ + +The server config entry for the catalog consumer is as follows:: + + "catalog_q": { + "logging": {standard_logging_dictionary}, + "retry_delays": List[int], + "print_tracebacks_fl": boolean, + "db_engine": str, + "db_options": { + "db_name" : str, + "db_user" : str, + "db_passwd" : str, + "echo": boolean + }, + "max_retries": int + } + +where ``logging``, ``retry_delays``, and ``print_tracebacks_fl`` are, as above, +standard configurables within the NLDS consumer ecosystem. ``max_retries`` is +similarly available in the cataloguer, with the same meaning as above. + +Here we also have two keys which control database behaviour via SQLAlchemy: +``db_engine`` and ``db_options``. ``db_engine`` is a string which specifies +which SQL flavour you would like SQLAlchemy. Currently this has been tried with +SQLite and PostgreSQL but, given how SQLAlchemy works, we expect few roadblocks +interacting with other database types. ``db_options`` is a further +sub-dictionary specifying the database name (which must be appropriate for +your chosen flavour of database), along with the database username and password +(if in use), respectively controlled by the keys ``db_name``, ``db_user``, and +``db_password``. Finally in this sub-dictionary ``echo``, an optional +boolean flag which controls the auto-logging of the SQLAlchemy engine. + + +Transfer-put and Transfer-get +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The server entry for the transfer-put consumer is as follows:: + + "transfer_put_q": { + "logging": {standard_logging_dictionary}, + "max_retries": int, + "retry_delays": List[int], + "print_tracebacks_fl": boolean, + "filelist_max_length": int, + "check_permissions_fl": boolean, + "use_pwd_gid_fl": boolean, + "tenancy": "cedadev-o.s3.jc.rl.ac.uk", + "require_secure_fl": false + } + +where we have ``logging``, ``retry_delays`` and ``print_tracebacks_fl`` as their +standard definitions defined above, and ``max_retries``, ``filelist_max_length`` +, ``check_permissions_fl``, and ``use_pwd_gid_fl`` defined the same as for the +Indexer consumer. + +New definitions for the transfer processor are the ``tenancy`` and +``require_secure_fl``, which control ``minio`` behaviour. ``tenancy`` is a +string which denotes the address of the object store tenancy to upload/download +files to/from, and ``require_secure_fl`` which specifies whether or not you +require signed ssl certificates at the tenancy location. + + +Monitor +^^^^^^^ + +The server config entry for the monitor consumer is as follows:: + + "monitor_q": { + "logging": {standard_logging_dictionary}, + "retry_delays": List[int], + "print_tracebacks_fl": boolean, + "db_engine": str, + "db_options": { + "db_name" : str, + "db_user" : str, + "db_passwd" : str, + "echo": boolean + } + } + +where ``logging``, ``retry_delays``, and ``print_tracebacks_fl`` have the +standard, previously stated definitions, and ``db_engine`` and ``db_options`` +are as defined for the Catalog consumer - due to the use of an SQL database on +the Monitor. Note the minimal retry control, as the monitor only retries +messages which failed due to an unexpected exception. + +Logger +^^^^^^ + +And finally, the server config entry for the Logger consumer is as follows:: + + "logging_q": { + "logging": {standard_logging_dictionary}, + "print_tracebacks_fl": boolean, + } + +where the options have been previously defined. Note that there is no special +configurable behaviour on the Logger consumer as it is simply a relay for +redirecting logging messages into log files. It should also be noted that the +``log_files`` option should be set in the logging sub-dictionary for this to +work properly, which may be a mandatory setting in future versions. \ No newline at end of file diff --git a/docs/spec/uml/monitor_db.png b/docs/spec/uml/monitor_db.png index b47336a9..8fef0ec4 100644 Binary files a/docs/spec/uml/monitor_db.png and b/docs/spec/uml/monitor_db.png differ diff --git a/docs/spec/uml/monitor_db.puml b/docs/spec/uml/monitor_db.puml index 7272ecf5..71db5461 100644 --- a/docs/spec/uml/monitor_db.puml +++ b/docs/spec/uml/monitor_db.puml @@ -10,6 +10,12 @@ object "**Transaction Record**" as tran_rec { creation_time [DATETIME] } + +object "**Warning**" as warn { + id [INT](unique) + warning [STRING] +} + object "**Sub Record**" as sub_rec { id [INT](unique) sub_id [UUID_64|STRING](unique) @@ -26,5 +32,6 @@ object "**Failed Files**" as fail_rec { tran_rec "1" *-- "many" sub_rec sub_rec "0" *-- "many" fail_rec +tran_rec "0" *-- "many" warn @enduml diff --git a/nlds/details.py b/nlds/details.py index 9a635297..ea949001 100644 --- a/nlds/details.py +++ b/nlds/details.py @@ -1,6 +1,6 @@ from collections import namedtuple from enum import Enum -from typing import NamedTuple, Optional, List, Dict +from typing import NamedTuple, Optional, List, Dict, TypeVar from json import JSONEncoder from pathlib import Path import stat @@ -9,6 +9,7 @@ from pydantic import BaseModel from .utils.permissions import check_permissions +from .rabbit.publisher import RabbitMQPublisher as RMQP # Patch the JSONEncoder so that a custom json serialiser can be run instead of # of the default, if one exists. This patches for ALL json.dumps calls. @@ -35,6 +36,38 @@ def __str__(self): "NOT_RECOGNISED", "UNINDEXED"][self.value] +class Retries(BaseModel): + count: Optional[int] = 0 + reasons: Optional[List[str]] = [] + + def increment(self, reason: str = None) -> None: + self.count += 1 + if reason: + self.reasons.append(reason) + + def reset(self) -> None: + self.count = 0 + self.reasons = [] + + def to_dict(self) -> Dict: + return { + RMQP.MSG_RETRIES: { + RMQP.MSG_RETRIES_COUNT: self.count, + RMQP.MSG_RETRIES_REASONS: self.reasons + } + } + + @classmethod + def from_dict(cls, dictionary: Dict[str, str]): + """Takes in a dictionary of the form generated by to_dict(), and returns + a Retries object representation of it.""" + return cls( + count=dictionary[RMQP.MSG_RETRIES][RMQP.MSG_RETRIES_COUNT], + reasons=dictionary[RMQP.MSG_RETRIES][RMQP.MSG_RETRIES_REASONS], + ) + +RetriesType = TypeVar('RetriesType', bound=Retries) + class PathDetails(BaseModel): original_path: Optional[str] object_name: Optional[str] @@ -47,8 +80,7 @@ class PathDetails(BaseModel): modify_time: Optional[float] path_type: Optional[PathType] = PathType.UNINDEXED link_path: Optional[str] - retries: Optional[int] = 0 - retry_reasons: Optional[List[str]] = [] + retries: Optional[RetriesType] = Retries() @property def path(self) -> str: @@ -69,23 +101,28 @@ def to_json(self): "path_type": self.path_type.value, "link_path": self.link_path, }, - "retries": self.retries, - "retry_reasons": self.retry_reasons + **self.retries.to_dict(), } @classmethod def from_dict(cls, json_contents: Dict[str, str]): - return cls(**json_contents['file_details'], - retries=json_contents["retries"], - retry_reasons=json_contents["retry_reasons"]) + if 'retries' in json_contents: + retries = Retries(**json_contents["retries"]) + else: + retries = Retries() + return cls( + **json_contents['file_details'], + retries=retries + ) @classmethod def from_path(cls, path: str): pd = cls(original_path=path) - return pd.stat() + pd.stat() + return pd @classmethod - def from_stat(cls, path: str, stat_result: NamedTuple): + def from_stat_result(cls, path: str, stat_result: NamedTuple): pd = cls(original_path=path) pd.stat(stat_result=stat_result) return pd @@ -137,12 +174,3 @@ def check_permissions(self, uid: int, gid: int, access=os.R_OK): path=self.original_path, stat_result=self.get_stat_result()) - def increment_retry(self, retry_reason: str = None) -> None: - self.retries += 1 - if retry_reason: - self.retry_reasons.append(retry_reason) - - def reset_retries(self) -> None: - self.retries = 0 - self.retry_reasons = [] - diff --git a/nlds/errors.py b/nlds/errors.py index 9463a0fc..7ac6b9e5 100644 --- a/nlds/errors.py +++ b/nlds/errors.py @@ -22,3 +22,8 @@ class RabbitRetryError(BaseException): def __init__(self, *args: object, ampq_exception: Exception = None) -> None: super().__init__(*args) self.ampq_exception = ampq_exception + +class CallbackError(BaseException): + + def __init__(self, *args: object) -> None: + super().__init__(*args) diff --git a/nlds/rabbit/consumer.py b/nlds/rabbit/consumer.py index 403ea211..7824eeb0 100644 --- a/nlds/rabbit/consumer.py +++ b/nlds/rabbit/consumer.py @@ -18,6 +18,9 @@ from datetime import datetime, timedelta import uuid import json +from json.decoder import JSONDecodeError +from urllib3.exceptions import HTTPError +import signal from pika.exceptions import StreamLostError, AMQPConnectionError from pika.channel import Channel @@ -33,8 +36,8 @@ LOGGING_CONFIG_SECTION, LOGGING_CONFIG_STDOUT, RABBIT_CONFIG_QUEUES, LOGGING_CONFIG_STDOUT_LEVEL, RABBIT_CONFIG_QUEUE_NAME, LOGGING_CONFIG_ROLLOVER) -from ..details import PathDetails -from ..errors import RabbitRetryError +from ..details import PathDetails, Retries +from ..errors import RabbitRetryError, CallbackError logger = logging.getLogger("nlds.root") @@ -89,6 +92,12 @@ def has_name(cls, name): def get_final_states(cls): return cls.TRANSFER_GETTING, cls.TRANSFER_PUTTING, cls.FAILED + def to_json(self): + return self.value + +class SigTermError(Exception): + pass + class RabbitMQConsumer(ABC, RabbitMQPublisher): DEFAULT_QUEUE_NAME = "test_q" DEFAULT_ROUTING_KEY = "test" @@ -103,6 +112,7 @@ class RabbitMQConsumer(ABC, RabbitMQPublisher): def __init__(self, queue: str = None, setup_logging_fl=False): super().__init__(name=queue, setup_logging_fl=False) + self.loop = True # TODO: (2021-12-21) Only one queue can be specified at the moment, # should be able to specify multiple queues to subscribe to but this @@ -150,6 +160,7 @@ def __init__(self, queue: str = None, setup_logging_fl=False): self.completelist = [] self.retrylist = [] self.failedlist = [] + self.max_retries = 5 # Controls default behaviour of logging when certain exceptions are # caught in the callback. @@ -235,6 +246,7 @@ def parse_filelist(self, body_json: dict) -> List[PathDetails]: def send_pathlist(self, pathlist: List[PathDetails], routing_key: str, body_json: Dict[str, str], state: State = None, mode: FilelistType = FilelistType.processed, + warning: List[str] = None ) -> None: """Convenience function which sends the given list of PathDetails objects to the exchange with the given routing key and message body. @@ -264,14 +276,18 @@ def send_pathlist(self, pathlist: List[PathDetails], routing_key: str, delay = 0 body_json[self.MSG_DETAILS][self.MSG_RETRY] = False if mode == FilelistType.processed: - # Reset the retries upon successful indexing. + # Reset the retries (both transaction-level and file-level) upon + # successful completion of processing. + trans_retries = Retries.from_dict(body_json) + trans_retries.reset() + body_json.update(trans_retries.to_dict()) for path_details in pathlist: - path_details.reset_retries() + path_details.retries.reset() elif mode == FilelistType.retry: # Delay the retry message depending on how many retries have been # accumulated. All retries in a retry list _should_ be the same so # base it off of the first one. - delay = self.get_retry_delay(pathlist[0].retries) + delay = self.get_retry_delay(pathlist[0].retries.count) self.log(f"Adding {delay / 1000}s delay to retry. Should be sent at" f" {datetime.now() + timedelta(milliseconds=delay)}", self.RK_LOG_DEBUG) @@ -295,6 +311,10 @@ def send_pathlist(self, pathlist: List[PathDetails], routing_key: str, self.publish_message(routing_key, body_json, delay=delay) # Send message to monitoring to keep track of state + # add any warning + if warning and len(warning) > 0: + body_json[self.MSG_DETAILS][self.MSG_WARNING] = warning + monitoring_rk = ".".join([routing_key[0], self.RK_MONITOR_PUT, self.RK_START]) @@ -338,6 +358,115 @@ def setup_logging(self, enable=False, log_level: str = None, return super().setup_logging(enable, log_level, log_format, add_stdout_fl, stdout_log_level, log_files, log_rollover) + def _log_errored_transaction(self, body, exception): + """Log message which has failed at some point in its callback""" + if self.print_tracebacks_fl: + tb = traceback.format_exc() + self.log("Printing traceback of error: ", self.RK_LOG_DEBUG) + self.log(tb, self.RK_LOG_DEBUG) + self.log( + f"Encountered error in message callback ({exception}), sending" + " to logger.", self.RK_LOG_ERROR, exc_info=exception + ) + self.log( + f"Failed message content: {body}", + self.RK_LOG_DEBUG + ) + + def _handle_expected_error(self, body, routing_key, original_error): + """Handle the workflow of an expected error - attempt to retry the + transaction or fail it as apparopriate. Given we don't know exactly what + is wrong with the message we need to be quite defensive with error + handling here so as to avoid breaking the consumption loop. + + """ + # First we log the expected error + self._log_errored_transaction(body, original_error) + + # Then we try to parse its source and retry information from the message + # body. + retries = None + try: + # First try to parse message body for retry information + body_json = json.loads(body) + retries = Retries.from_dict(body_json) + except (JSONDecodeError, KeyError) as e: + self.log("Could not retrieve failed message retry information " + f"{e}, will now attempt to fail message in monitoring.", + self.RK_LOG_INFO) + try: + # Get message source from routing key, if possible + rk_parts = self.split_routing_key(routing_key) + rk_source = rk_parts[0] + except Exception as e: + self.log("Could not retrieve routing key source, reverting to " + "default NLDS value.", self.RK_LOG_WARNING) + rk_source = self.RK_ROOT + + monitoring_rk = ".".join([rk_source, + self.RK_MONITOR_PUT, + self.RK_START]) + + if retries is not None and retries.count <= self.max_retries: + # Retry the job + self.log(f"Retrying errored job with routing key {routing_key}", + self.RK_LOG_INFO) + try: + self._retry_transaction(body_json, retries, routing_key, + monitoring_rk, original_error) + except Exception as e: + self.log(f"Failed attempt to retry transaction that failed " + "during callback. Error: {e}.", + self.RK_LOG_WARNING) + # Fail the job if at any point the attempt to retry fails. + self._fail_transaction(body_json, monitoring_rk) + else: + # Fail the job + self._fail_transaction(body_json, monitoring_rk) + + def _fail_transaction(self, body_json, monitoring_rk): + """Attempt to mark transaction as failed in monitoring db""" + try: + # Send message to monitoring to keep track of state + body_json[self.MSG_DETAILS][self.MSG_STATE] = State.FAILED + body_json[self.MSG_DETAILS][self.MSG_RETRY] = False + self.publish_message(monitoring_rk, body_json) + except Exception as e: + # If this fails there's not much we can do at this stage... + # TODO: might be worth figuring out a way of just extracting the + # transaction id and failing the job from that? If it's gotten to + # this point and failed then + self.log("Failed attempt to mark transaction as failed in " + "monitoring.", self.RK_LOG_WARNING) + self.log(f"Exception that arose during attempt to mark job as " + f"failed: {e}", self.RK_LOG_DEBUG) + self.log(f"Message that couldn't be failed: " + f"{json.dumps(body_json)}", self.RK_LOG_DEBUG) + + def _retry_transaction( + self, + body_json: Dict[str, str], + retries: Retries, + original_rk: str, + monitoring_rk: str, + error: Exception + ) -> None: + """Attempt to retry the message with a retry delay, back to the original + routing_key""" + # Delay the retry message depending on how many retries have been + # accumulated - using simply the transaction-level retries + retries.increment(reason=f"Exception during callback: {error}") + body_json.update(retries.to_dict()) + delay = self.get_retry_delay(retries.count) + self.log(f"Adding {delay / 1000}s delay to retry. Should be sent at" + f" {datetime.now() + timedelta(milliseconds=delay)}", + self.RK_LOG_DEBUG) + body_json[self.MSG_DETAILS][self.MSG_RETRY] = True + + # Send to original routing key (i.e. retry it) with the requisite delay + # and also update the monitoring db. + self.publish_message(original_rk, body_json, delay=delay) + self.publish_message(monitoring_rk, body_json) @staticmethod def _acknowledge_message(channel: Channel, delivery_tag: str) -> None: @@ -400,33 +529,13 @@ def _wrapped_callback(self, ch: Channel, method: Method, properties: Header, KeyError, PermissionError, RabbitRetryError, - ) as e: - try: - # Attempt to mark job as failed in monitoring db - # TODO: this probably isn't the best way of doing this! - rk_parts = self.split_routing_key(method.routing_key) - body_json = json.loads(body) - - # Send message to monitoring to keep track of state - monitoring_rk = ".".join([rk_parts[0], - self.RK_MONITOR_PUT, - self.RK_START]) - body_json[self.MSG_DETAILS][self.MSG_STATE] = State.FAILED - self.publish_message(monitoring_rk, body_json) - except: - self.log("Failed attempt to mark job as failed in monitoring.", - self.RK_LOG_WARNING) - if self.print_tracebacks_fl: - tb = traceback.format_exc() - self.log(tb, self.RK_LOG_DEBUG) - self.log( - f"Encountered error ({e}), sending to logger.", - self.RK_LOG_ERROR, exc_info=e - ) - self.log( - f"Failed message content: {body}", - self.RK_LOG_DEBUG - ) + CallbackError, + JSONDecodeError, + HTTPError, + ) as original_error: + self._handle_expected_error(body, method.routing_key, original_error) + except Exception as e: + self._log_errored_transaction(body, e) finally: # Ack message only if it has failed in the limited number of ways # above, otherwise the exception is reraised and breaks the @@ -476,6 +585,9 @@ def append_route_info(cls, body: Dict, route_info: str = None) -> Dict: else: body[cls.MSG_DETAILS][cls.MSG_ROUTE] = route_info return body + + def exit(self, *args): + raise SigTermError def run(self): """ @@ -488,7 +600,10 @@ def run(self): :return: """ - while True: + # set up SigTerm handler + signal.signal(signal.SIGTERM, self.exit) + + while self.loop: self.get_connection() try: @@ -497,7 +612,11 @@ def run(self): self.channel.start_consuming() except KeyboardInterrupt: - self.channel.stop_consuming() + self.loop = False + break + + except SigTermError: + self.loop = False break except (StreamLostError, AMQPConnectionError) as e: @@ -509,6 +628,8 @@ def run(self): # Catch all other exceptions and log them as critical. tb = traceback.format_exc() self.log(tb, self.RK_LOG_CRITICAL, exc_info=e) - - self.channel.stop_consuming() + self.loop = False break + + self.channel.stop_consuming() + diff --git a/nlds/rabbit/publisher.py b/nlds/rabbit/publisher.py index abd56c0c..7f10abb5 100644 --- a/nlds/rabbit/publisher.py +++ b/nlds/rabbit/publisher.py @@ -10,25 +10,31 @@ import sys from datetime import datetime, timedelta -from uuid import UUID import json import logging from logging.handlers import TimedRotatingFileHandler from typing import Dict, List import pathlib -from collections import namedtuple +import collections import pika from pika.exceptions import AMQPConnectionError, UnroutableError, ChannelWrongStateError from retry import retry from ..server_config import ( - LOGGING_CONFIG_ROLLOVER, load_config, LOGGING_CONFIG_FILES, LOGGING_CONFIG_STDOUT, - RABBIT_CONFIG_SECTION, LOGGING_CONFIG_SECTION, LOGGING_CONFIG_LEVEL, - LOGGING_CONFIG_STDOUT_LEVEL, LOGGING_CONFIG_FORMAT, LOGGING_CONFIG_ENABLE + load_config, + GENERAL_CONFIG_SECTION, + RABBIT_CONFIG_SECTION, + LOGGING_CONFIG_SECTION, + LOGGING_CONFIG_ROLLOVER, + LOGGING_CONFIG_FILES, + LOGGING_CONFIG_STDOUT, + LOGGING_CONFIG_LEVEL, + LOGGING_CONFIG_STDOUT_LEVEL, + LOGGING_CONFIG_FORMAT, + LOGGING_CONFIG_ENABLE, ) from ..errors import RabbitRetryError -from ..details import PathDetails logger = logging.getLogger("nlds.root") @@ -113,6 +119,7 @@ class RabbitMQPublisher(): MSG_NEW_META = "new_meta" MSG_LABEL = "label" MSG_TAG = "tag" + MSG_DEL_TAG = "del_tag" MSG_PATH = "path" MSG_HOLDING_ID = "holding_id" MSG_HOLDING_LIST = "holdings" @@ -122,8 +129,13 @@ class RabbitMQPublisher(): MSG_FAILURE = "failure" MSG_USER_QUERY = "user_query" MSG_GROUP_QUERY = "group_query" - MSG_RETRY_COUNT = "retry_count" + MSG_RETRY_COUNT_QUERY = "retry_count" MSG_RECORD_LIST = "records" + MSG_WARNING = "warning" + + MSG_RETRIES = "retries" + MSG_RETRIES_COUNT = "count" + MSG_RETRIES_REASONS = "reasons" MSG_TYPE = "type" MSG_TYPE_STANDARD = "standard" @@ -145,6 +157,10 @@ def __init__(self, name="publisher", setup_logging_fl=False): # Get rabbit-specific section of config file self.whole_config = load_config() self.config = self.whole_config[RABBIT_CONFIG_SECTION] + if GENERAL_CONFIG_SECTION in self.whole_config: + self.general_config = self.whole_config[GENERAL_CONFIG_SECTION] + else: + self.general_config = dict() # Set name for logging purposes self.name = name @@ -163,7 +179,15 @@ def __init__(self, name="publisher", setup_logging_fl=False): self.connection = None self.channel = None - self.retry_delays = self.DEFAULT_RETRY_DELAYS + try: + # Do some basic verification of the general retry delays. + self.retry_delays = self.general_config[self.RETRY_DELAYS] + assert (isinstance(self.retry_delays, collections.Sequence) + and not isinstance(self.retry_delays, str)) + assert len(self.retry_delays) > 0 + assert isinstance(self.retry_delays[0], int) + except (KeyError, TypeError, AssertionError): + self.retry_delays = self.DEFAULT_RETRY_DELAYS if setup_logging_fl: self.setup_logging() diff --git a/nlds/routers/files.py b/nlds/routers/files.py index 8c15eed1..5a9b39cf 100644 --- a/nlds/routers/files.py +++ b/nlds/routers/files.py @@ -16,16 +16,15 @@ from uuid import UUID, uuid4 from typing import Optional, List, Dict from copy import deepcopy -import json from ..routers import rabbit_publisher from ..rabbit.publisher import RabbitMQPublisher as RMQP -from . import rpc_publisher from ..errors import ResponseError -from ..details import PathDetails +from ..details import PathDetails, Retries from ..authenticators.authenticate_methods import authenticate_token, \ authenticate_group, \ authenticate_user + router = APIRouter() # uuid (for testing) @@ -58,11 +57,20 @@ def get_cleaned_list(self) -> List[str]: class FileResponse(BaseModel): - uuid: UUID + transaction_id: UUID msg: str - + user: str = "" + group: str = "" + api_action: str = "" + job_label: str = "" + tenancy: str = "" + label: str = "" + holding_id: int = -1 + tag: str = "" ############################ GET METHOD ############################ +# this is not used by the NLDS client but is left in case another +# program contacts this URL @router.get("/", status_code = status.HTTP_202_ACCEPTED, responses = { @@ -100,14 +108,13 @@ async def get(transaction_id: UUID, job_label = transaction_id[0:8] # return response, job label accepted for processing response = FileResponse( - uuid = transaction_id, - msg = (f"GET transaction with transaction_id:{transaction_id} and " - "job label:{job_label} accepted for processing.") + transaction_id = transaction_id, + msg = (f"GET transaction accepted for processing.") ) contents = [filepath, ] # create the message dictionary - do this here now as it's more transparent routing_key = f"{RMQP.RK_ROOT}.{RMQP.RK_ROUTE}.{RMQP.RK_GETLIST}" - api_method = f"{RMQP.RK_GETLIST}" + api_action = f"{RMQP.RK_GETLIST}" msg_dict = { RMQP.MSG_DETAILS: { RMQP.MSG_TRANSACT_ID: str(transaction_id), @@ -118,15 +125,24 @@ async def get(transaction_id: UUID, RMQP.MSG_TARGET: target, RMQP.MSG_ACCESS_KEY: access_key, RMQP.MSG_SECRET_KEY: secret_key, - RMQP.MSG_API_ACTION: api_method, + RMQP.MSG_API_ACTION: api_action, RMQP.MSG_JOB_LABEL: job_label }, RMQP.MSG_DATA: { # Convert to PathDetails for JSON serialisation RMQP.MSG_FILELIST: [PathDetails(original_path=item) for item in contents], - }, - RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD + }, + **Retries().to_dict(), + RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD, } + response.user = user + response.group = group + response.api_action = api_action + if job_label: + response.job_label = job_label + if tenancy: + response.tenancy = tenancy + rabbit_publisher.publish_message(routing_key, msg_dict) return JSONResponse(status_code = status.HTTP_202_ACCEPTED, content = response.json()) @@ -170,9 +186,8 @@ async def put(transaction_id: UUID, job_label = transaction_id[0:8] # return response, transaction id accepted for processing response = FileResponse( - uuid = transaction_id, - msg = (f"GETLIST transaction with transaction_id:{transaction_id} and " - "job_label:{job_label} accepted for processing.") + transaction_id = transaction_id, + msg = (f"GETLIST transaction accepted for processing.") ) # Convert filepath or filelist to lists @@ -198,16 +213,28 @@ async def put(transaction_id: UUID, # Convert to PathDetails for JSON serialisation RMQP.MSG_FILELIST: [PathDetails(original_path=item) for item in contents], }, - RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD + **Retries().to_dict(), + RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD, } + response.user = user + response.group = group + response.api_action = api_method + if job_label: + response.job_label = job_label + if tenancy: + response.tenancy = tenancy # add the metadata meta_dict = {} if (filemodel.label): meta_dict[RMQP.MSG_LABEL] = filemodel.label + response.label = filemodel.label if (filemodel.holding_id): meta_dict[RMQP.MSG_HOLDING_ID] = filemodel.holding_id + response.holding_id = filemodel.holding_id if (filemodel.tag): - meta_dict[RMQP.MSG_TAG] = filemodel.tag + tag_dict = filemodel.tag + meta_dict[RMQP.MSG_TAG] = tag_dict + response.tag = tag_dict if (len(meta_dict) > 0): msg_dict[RMQP.MSG_META] = meta_dict @@ -257,12 +284,11 @@ async def put(transaction_id: UUID, if job_label is None: job_label = transaction_id[0:8] - + # return response, transaction id accepted for processing response = FileResponse( - uuid = transaction_id, - msg = (f"PUT transaction with transaction_id:{transaction_id} and " - "job_label:{job_label} accepted for processing.\n") + transaction_id = transaction_id, + msg = (f"PUT transaction accepted for processing.") ) # create the message dictionary - do this here now as it's more transparent routing_key = f"{RMQP.RK_ROOT}.{RMQP.RK_ROUTE}.{RMQP.RK_PUT}" @@ -283,16 +309,28 @@ async def put(transaction_id: UUID, # Convert to PathDetails for JSON serialisation RMQP.MSG_FILELIST: [PathDetails(original_path=item) for item in contents], }, - RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD + **Retries().to_dict(), + RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD, } + response.user = user + response.group = group + response.api_action = api_method + if job_label: + response.job_label = job_label + if tenancy: + response.tenancy = tenancy # add the metadata meta_dict = {} if (filemodel.label): meta_dict[RMQP.MSG_LABEL] = filemodel.label + response.label = filemodel.label if (filemodel.holding_id): meta_dict[RMQP.MSG_HOLDING_ID] = filemodel.holding_id + response.holding_id = filemodel.holding_id if (filemodel.tag): - meta_dict[RMQP.MSG_TAG] = filemodel.tag + tag_dict = filemodel.tag + meta_dict[RMQP.MSG_TAG] = tag_dict + response.tag = tag_dict if (len(meta_dict) > 0): msg_dict[RMQP.MSG_META] = meta_dict diff --git a/nlds/routers/find.py b/nlds/routers/find.py index dd62c73c..c17e0008 100644 --- a/nlds/routers/find.py +++ b/nlds/routers/find.py @@ -17,7 +17,6 @@ from typing import Optional, List, Dict from ..rabbit.publisher import RabbitMQPublisher as RMQP -from ..rabbit.rpc_publisher import RabbitMQRPCPublisher from ..routers import rpc_publisher from ..errors import ResponseError from ..authenticators.authenticate_methods import authenticate_token, \ diff --git a/nlds/routers/list.py b/nlds/routers/list.py index e35d0137..4ab4760f 100644 --- a/nlds/routers/list.py +++ b/nlds/routers/list.py @@ -21,6 +21,7 @@ from ..authenticators.authenticate_methods import authenticate_token, \ authenticate_group, \ authenticate_user +from ..utils.process_tag import process_tag router = APIRouter() @@ -73,13 +74,8 @@ async def get(token: str = Depends(authenticate_token), tag_dict = {} # convert the string into a dictionary try: - # strip whitespace and "{" "}" symbolsfirst - tag_list = (tag.replace(" ","").replace("{", "").replace("}", "") - ).split(",") - for tag_i in tag_list: - tag_kv = tag_i.split(":") - tag_dict[tag_kv[0]] = tag_kv[1] - except: # what exception might be raised here? + tag_dict = process_tag(tag) + except ValueError: response_error = ResponseError( loc = ["holdings", "get"], msg = "tag cannot be processed.", diff --git a/nlds/routers/meta.py b/nlds/routers/meta.py index 43c9b45c..d1b82f06 100644 --- a/nlds/routers/meta.py +++ b/nlds/routers/meta.py @@ -30,6 +30,7 @@ class MetaModel(BaseModel): new_label: str = None new_tag: Dict[str, str] = None + del_tag: Dict[str, str] = None class MetaResponse(BaseModel): files: List[Dict] @@ -53,7 +54,7 @@ async def post(metamodel: MetaModel, group: str = Depends(authenticate_group), label: Optional[str] = None, holding_id: Optional[int] = None, - tag: Optional[str] = None, + tag: Optional[str] = None ): # create the message dictionary @@ -98,6 +99,8 @@ async def post(metamodel: MetaModel, new_meta_dict[RMQP.MSG_LABEL] = metamodel.new_label if (metamodel.new_tag): new_meta_dict[RMQP.MSG_TAG] = metamodel.new_tag + if (metamodel.del_tag): + new_meta_dict[RMQP.MSG_DEL_TAG] = metamodel.del_tag # add the "meta" section if (len(meta_dict) > 0): diff --git a/nlds/routers/status.py b/nlds/routers/status.py index 1a3d9ae8..04844f5a 100644 --- a/nlds/routers/status.py +++ b/nlds/routers/status.py @@ -127,7 +127,7 @@ async def get(token: str = Depends(authenticate_token), RMQP.MSG_JOB_LABEL: job_label, RMQP.MSG_STATE: state, RMQP.MSG_SUB_ID: sub_id, - RMQP.MSG_RETRY_COUNT: retry_count, + RMQP.MSG_RETRY_COUNT_QUERY: retry_count, RMQP.MSG_USER_QUERY: user, RMQP.MSG_GROUP_QUERY: group, RMQP.MSG_API_ACTION: api_action, diff --git a/nlds/server_config.py b/nlds/server_config.py index a92d9647..07d9e18a 100644 --- a/nlds/server_config.py +++ b/nlds/server_config.py @@ -31,6 +31,8 @@ LOGGING_CONFIG_FILES = "log_files" LOGGING_CONFIG_ROLLOVER = "rollover" +GENERAL_CONFIG_SECTION = "general" + # Defines the compulsory server config file sections CONFIG_SCHEMA = ( (AUTH_CONFIG_SECTION, ("authenticator_backend", )), diff --git a/nlds/utils/process_tag.py b/nlds/utils/process_tag.py index 643d3f32..410a1bff 100644 --- a/nlds/utils/process_tag.py +++ b/nlds/utils/process_tag.py @@ -1,15 +1,16 @@ def process_tag(tag): """Process a tag in string format into dictionary format""" - try: + # try: + if True: tag_dict = {} - # strip whitespace and "{" "}" symbolsfirst - tag_list = (tag.replace(" ","").replace("{", "").replace("}", "") + # strip "{" "}" symbolsfirst + tag_list = (tag.replace("{", "").replace("}", "") ).split(",") for tag_i in tag_list: tag_kv = tag_i.split(":") if len(tag_kv) < 2: continue tag_dict[tag_kv[0]] = tag_kv[1] - except: # what exceptions might be raised here? - raise ValueError + # except: # what exceptions might be raised here? + # raise ValueError return tag_dict \ No newline at end of file diff --git a/nlds_processors/catalog/catalog.py b/nlds_processors/catalog/catalog.py index 445b4b47..0831aedc 100644 --- a/nlds_processors/catalog/catalog.py +++ b/nlds_processors/catalog/catalog.py @@ -1,6 +1,7 @@ # SQLalchemy imports from sqlalchemy import func, Enum -from sqlalchemy.exc import IntegrityError, OperationalError, ArgumentError +from sqlalchemy.exc import IntegrityError, OperationalError, ArgumentError, \ + NoResultFound from nlds_processors.catalog.catalog_models import CatalogBase, File, Holding,\ Location, Transaction, Storage, Checksum, Tag @@ -24,8 +25,8 @@ def __init__(self, db_engine: str, db_options: str): self.session = None - def _user_has_get_holding_permission(self, - user: str, + @staticmethod + def _user_has_get_holding_permission(user: str, group: str, holding: Holding) -> bool: """Check whether a user has permission to view this holding. @@ -47,24 +48,52 @@ def get_holding(self, assert(self.session != None) try: if holding_id: - holding = self.session.query(Holding).filter( + holding_q = self.session.query(Holding).filter( Holding.user == user, Holding.group == group, Holding.id == holding_id, - ).all() + ) elif transaction_id: - holding = self.session.query(Holding).filter( + holding_q = self.session.query(Holding).filter( Holding.user == user, Holding.group == group, Transaction.holding_id == Holding.id, Transaction.transaction_id == transaction_id - ).all() + ) else: - holding = self.session.query(Holding).filter( + # label == None is return all holdings + if label is None: + search_label = ".*" + else: + search_label = label + + holding_q = self.session.query(Holding).filter( Holding.user == user, Holding.group == group, - Holding.label.regexp_match(label), - ).all() + Holding.label.regexp_match(search_label), + ) + # filter the query on any tags + if tag: + # get the holdings that have a key that matches one or more of + # the keys in the tag dictionary passed as a parameter + holding_q = holding_q.join(Tag).filter( + Tag.key.in_(tag.keys()) + ) + # check for zero + if holding_q.count == 0: + holding = [] + else: + # we have now got a subset of holdings with a tag that has + # a key that matches the keys in the input dictionary + # now find the holdings where the key and value match + for key, item in tag.items(): + holding_q = holding_q.filter( + Tag.key == key, + Tag.value == item + ) + holding = holding_q.all() + else: + holding = holding_q.all() # check if at least one holding found if len(holding) == 0: raise KeyError @@ -76,21 +105,21 @@ def get_holding(self, f"to access the holding with label:{h.label}." ) except (IntegrityError, KeyError, ArgumentError): + msg = "" if holding_id: - raise CatalogError( - f"Holding with holding_id:{holding_id} not found for " - f"user:{user} and group:{group}." - ) + msg = (f"Holding with holding_id:{holding_id} not found for " + f"user:{user} and group:{group}") elif transaction_id: - raise CatalogError( - f"Holding containing transaction_id:{transaction_id} not " - f"found for user:{user} and group:{group}." - ) + msg = (f"Holding containing transaction_id:{transaction_id} not " + f"found for user:{user} and group:{group}") else: - raise CatalogError( - f"Holding with label:{label} not found for " - f"user:{user} and group:{group}." - ) + msg = (f"Holding with label:{label} not found for " + f"user:{user} and group:{group}") + if tag: + msg += f" with tags:{tag}." + else: + msg += "." + raise CatalogError(msg) except (OperationalError): raise CatalogError( f"Invalid regular expression:{label} when listing holding for " @@ -124,9 +153,15 @@ def create_holding(self, def modify_holding(self, holding: Holding, new_label: str=None, - new_tags: dict=None) -> Holding: + new_tags: dict=None, + del_tags: dict=None) -> Holding: """Find a holding and modify the information in it""" assert(self.session != None) + if not isinstance(holding, Holding): + raise CatalogError( + f"Cannot modify holding, it does not appear to be a valid " + f"Holding ({holding})." + ) # change the label if a new_label supplied if new_label: try: @@ -141,9 +176,23 @@ def modify_holding(self, if new_tags: for k in new_tags: - # create_tag takes a key and value - tag = self.create_tag(holding, k, new_tags[k]) - self.session.flush() + # if the tag exists then modify it, if it doesn't then create it + try: + # get + tag = self.get_tag(holding, k) + except CatalogError: + # create + tag = self.create_tag(holding, k, new_tags[k]) + else: + # modify + tag = self.modify_tag(holding, k, new_tags[k]) + if del_tags: + for k in del_tags: + # if the tag exists and the value matches then delete it + tag = self.get_tag(holding, k) + if tag.value == del_tags[k]: + self.del_tag(holding, k) + self.session.flush() return holding @@ -294,13 +343,9 @@ def get_files(self, assert(self.session != None) # Nones are set to .* in the regexp matching # get the matching holdings first, these match all but the path - if holding_label: - holding_search = holding_label - else: - holding_search = ".*" holding = self.get_holding( - user, group, holding_search, holding_id, transaction_id, tag + user, group, holding_label, holding_id, transaction_id, tag ) if path: @@ -377,6 +422,7 @@ def create_file(self, ) return new_file + def delete_files(self, user: str, group: str, @@ -404,6 +450,7 @@ def delete_files(self, err_msg = f"File with original_path:{path} could not be deleted" raise CatalogError(err_msg) + def get_location(self, file: File, storage_type: Enum) -> Location: @@ -465,8 +512,62 @@ def create_tag(self, value = value, holding_id = holding.id ) + self.session.add(tag) + self.session.flush() # flush to generate tag.id except (IntegrityError, KeyError): raise CatalogError( f"Tag could not be added to holding:{holding.label}" ) - return tag \ No newline at end of file + return tag + + + def get_tag(self, holding: Holding, key: str): + """Get the tag with a specific key""" + assert(self.session != None) + try: + tag = self.session.query(Tag).filter( + Tag.key == key, + Tag.holding_id == holding.id + ).one() # uniqueness constraint guarantees only one + except (NoResultFound, KeyError): + raise CatalogError( + f"Tag with key:{key} not found" + ) + return tag + + + def modify_tag(self, holding: Holding, key: str, value: str): + """Modify a tag that has the key, with a new value. + Tag has to exist, current value will be overwritten.""" + assert(self.session != None) + try: + tag = self.session.query(Tag).filter( + Tag.key == key, + Tag.holding_id == holding.id + ).one() # uniqueness constraint guarantees only one + tag.value = value + except (NoResultFound, KeyError): + raise CatalogError( + f"Tag with key:{key} not found" + ) + return tag + + + def del_tag(self, holding: Holding, key: str): + """Delete a tag that has the key""" + assert(self.session != None) + # use a checkpoint as the tags are being deleted in an external loop and + # using a checkpoint will ensure that any completed deletes are committed + checkpoint = self.session.begin_nested() + try: + tag = self.session.query(Tag).filter( + Tag.key == key, + Tag.holding_id == holding.id + ).one() # uniqueness constraint guarantees only one + self.session.delete(tag) + except (NoResultFound, KeyError): + checkpoint.rollback() + raise CatalogError( + f"Tag with key:{key} not found" + ) + return None diff --git a/nlds_processors/catalog/catalog_models.py b/nlds_processors/catalog/catalog_models.py index 284e6bc4..b276c73d 100644 --- a/nlds_processors/catalog/catalog_models.py +++ b/nlds_processors/catalog/catalog_models.py @@ -28,6 +28,18 @@ class Holding(CatalogBase): transactions = relationship("Transaction", cascade="delete, delete-orphan") # label must be unique per user __table_args__ = (UniqueConstraint('label', 'user'),) + # return the tags as a dictionary + def get_tags(self): + tags = {} + for t in self.tags: + tags[t.key] = t.value + return tags + # return the transaction ids as a list + def get_transaction_ids(self): + t_ids = [] + for t in self.transactions: + t_ids.append(t.transaction_id) + return t_ids class Transaction(CatalogBase): diff --git a/nlds_processors/catalog/catalog_worker.py b/nlds_processors/catalog/catalog_worker.py index 899f2327..5cd3ac33 100644 --- a/nlds_processors/catalog/catalog_worker.py +++ b/nlds_processors/catalog/catalog_worker.py @@ -35,6 +35,7 @@ from nlds.rabbit.consumer import RabbitMQConsumer as RMQC from nlds.rabbit.consumer import State +from nlds.errors import CallbackError from nlds_processors.catalog.catalog import Catalog, CatalogError from nlds_processors.catalog.catalog_models import Storage @@ -142,7 +143,11 @@ def _catalog_put(self, body: dict, rk_origin: str) -> None: except KeyError: holding_id = None - ######## TAGS TAGS TAGS ######## + # get any tags that exist + try: + tags = body[self.MSG_META][self.MSG_TAG] + except KeyError: + tags = None # start the database transactions self.catalog.start_session() @@ -152,8 +157,15 @@ def _catalog_put(self, body: dict, rk_origin: str) -> None: holding = self.catalog.get_holding(user, group, label, holding_id) except (KeyError, CatalogError): holding = None - if holding is None: + # if the holding_id is not None then raise an error as the user is + # trying to add to a holding that doesn't exist, but creating a new + # holding won't have a holding_id that matches the one they passed in + if (holding_id is not None): + message = (f"Could not add files to holding with holding_id: " + "{holding_id}. holding_id does not exist.") + self.log(message, self.RK_LOG_DEBUG) + raise CallbackError(message) try: holding = self.catalog.create_holding(user, group, label) except CatalogError as e: @@ -230,15 +242,34 @@ def _catalog_put(self, body: dict, rk_origin: str) -> None: ) self.completelist.append(pd) except CatalogError as e: - if pd.retries > self.max_retries: + if pd.retries.count > self.max_retries: self.failedlist.append(pd) else: - pd.increment_retry( - retry_reason=f"{e.message}" + pd.retries.increment( + reason=f"{e.message}" ) self.retrylist.append(pd) self.log(e.message, RMQC.RK_LOG_ERROR) continue + + # add the tags - if the tag already exists then don't add it or modify + # it, with the reasoning that the user can change it with the `meta` + # command. + warnings = [] + if tags: + for k in tags: + try: + tag = self.catalog.get_tag(holding, k) + except CatalogError: # tag's key not found so create + self.catalog.create_tag(holding, k, tags[k]) + else: + # append a warning that the tag could not be added to the holding + warnings.append( + f"Tag with key:{k} could not be added to holding with label" + f":{label} as that tag already exists. Tags can be modified" + f" using the meta command" + ) + # stop db transitions and commit self.catalog.save() self.catalog.end_session() @@ -254,7 +285,8 @@ def _catalog_put(self, body: dict, rk_origin: str) -> None: self.RK_LOG_DEBUG ) self.send_pathlist(self.completelist, rk_complete, body, - state=State.CATALOG_PUTTING) + state=State.CATALOG_PUTTING, + warning=warnings) # RETRY if len(self.retrylist) > 0: rk_retry = ".".join([rk_origin, @@ -265,7 +297,8 @@ def _catalog_put(self, body: dict, rk_origin: str) -> None: self.RK_LOG_DEBUG ) self.send_pathlist(self.retrylist, rk_retry, body, mode="retry", - state=State.CATALOG_PUTTING) + state=State.CATALOG_PUTTING, + warning=warnings) # FAILED if len(self.failedlist) > 0: rk_failed = ".".join([rk_origin, @@ -276,7 +309,8 @@ def _catalog_put(self, body: dict, rk_origin: str) -> None: self.RK_LOG_DEBUG ) self.send_pathlist(self.failedlist, rk_failed, body, - mode="failed") + mode="failed", + warning=warnings) def _catalog_get(self, body: dict, rk_origin: str) -> None: @@ -331,7 +365,10 @@ def _catalog_get(self, body: dict, rk_origin: str) -> None: ) except CatalogError as e: self.log(e.message, RMQC.RK_LOG_ERROR) - return + message = (f"Could not find record of requested holding: " + f"label: {holding_label}, id: {holding_id}") + self.log(message, self.RK_LOG_DEBUG) + raise CallbackError(message) for f in filelist: file_details = PathDetails.from_dict(f) @@ -373,23 +410,23 @@ def _catalog_get(self, body: dict, rk_origin: str) -> None: link_path = file.link_path ) except CatalogError as e: - if file_details.retries > self.max_retries: + if file_details.retries.count > self.max_retries: self.failedlist.append(file_details) else: self.retrylist.append(file_details) - file_details.increment_retry( - retry_reason=f"{e.message}" + file_details.retries.increment( + reason=f"{e.message}" ) self.log(e.message, RMQC.RK_LOG_ERROR) continue self.completelist.append(new_file) except CatalogError as e: - if file_details.retries > self.max_retries: + if file_details.retries.count > self.max_retries: self.failedlist.append(file_details) else: - file_details.increment_retry( - retry_reason=f"{e.message}" + file_details.retries.increment( + reason=f"{e.message}" ) self.retrylist.append(file_details) self.log(e.message, RMQC.RK_LOG_ERROR) @@ -504,11 +541,11 @@ def _catalog_del(self, body: dict, rk_origin: str) -> None: tag=tag ) except CatalogError as e: - if file_details.retries > self.max_retries: + if file_details.retries.count > self.max_retries: self.failedlist.append(file_details) else: - file_details.increment_retry( - retry_reason=f"{e.message}" + file_details.retries.increment( + reason=f"{e.message}" ) self.retrylist.append(file_details) self.log(e.message, RMQC.RK_LOG_ERROR) @@ -553,6 +590,7 @@ def _catalog_del(self, body: dict, rk_origin: str) -> None: self.catalog.save() self.catalog.end_session() + def _catalog_list(self, body: dict, properties: Header) -> None: """List the users holdings""" # get the user id from the details section of the message @@ -586,7 +624,7 @@ def _catalog_list(self, body: dict, properties: Header) -> None: try: holding_label = body[self.MSG_META][self.MSG_LABEL] except KeyError: - holding_label = ".*" + holding_label = None # get the tags from the details sections of the message try: @@ -618,15 +656,10 @@ def _catalog_list(self, body: dict, properties: Header) -> None: "label": h.label, "user": h.user, "group": h.group, - "tags": h.tags, + "tags": h.get_tags(), + "transactions": h.get_transaction_ids(), "date": t.ingest_time.isoformat() } - # add the transaction ids: - t_ids = [] - for t in h.transactions: - t_ids.append(t.transaction_id) - - ret_dict["transactions"] = t_ids ret_list.append(ret_dict) # add the return list to successfully completed holding listings body[self.MSG_DATA][self.MSG_HOLDING_LIST] = ret_list @@ -644,6 +677,7 @@ def _catalog_list(self, body: dict, properties: Header) -> None: correlation_id=properties.correlation_id ) + def _catalog_stat(self, body: dict, properties: Header) -> None: """Get the labels for a list of transaction ids""" # get the user id from the details section of the message @@ -722,6 +756,7 @@ def _catalog_stat(self, body: dict, properties: Header) -> None: correlation_id=properties.correlation_id ) + def _catalog_find(self, body: dict, properties: Header) -> None: """List the user's files""" # get the user id from the details section of the message @@ -899,63 +934,63 @@ def _catalog_meta(self, body: dict, properties: Header) -> None: except KeyError: new_tag = None + # get the deleted tag(s) from the new_meta section of the message + try: + del_tag = body[self.MSG_META][self.MSG_NEW_META][self.MSG_DEL_TAG] + except KeyError: + del_tag = None + self.catalog.start_session() # if there is the holding label or holding id then get the holding try: - if not holding_label and not holding_id: + if not holding_label and not holding_id and not tag: raise CatalogError( - "Holding not found: holding_id or label not specified" + "Holding not found: holding_id or label or tag(s) not specified." ) holdings = self.catalog.get_holding( - user, group, holding_label, holding_id, tag + user, group, holding_label, holding_id, tag=tag ) + ret_list = [] + for holding in holdings: + # get the old metadata so we can record it, then modify + old_meta = { + "label": holding.label, + "tags" : holding.get_tags() + } + holding = self.catalog.modify_holding( + holding, new_label, new_tag, del_tag + ) + # record the new metadata + new_meta = { + "label": holding.label, + "tags": holding.get_tags() + } + # build the return dictionary and append it to the list of + # holdings that have been modified + ret_dict = { + "id": holding.id, + "user": holding.user, + "group": holding.group, + "old_meta" : old_meta, + "new_meta" : new_meta, + } + ret_list.append(ret_dict) - if len(holdings) > 1: - if holding_label: - raise CatalogError( - f"More than one holding returned for label:" - f"{holding_label}" - ) - elif holding_id: - raise CatalogError( - f"More than one holding returned for holding_id:" - f"{holding_id}" - ) - else: - holding = holdings[0] - - old_meta = { - "label": holding.label, - "tags": {t.key:t.value for t in holding.tags} - } - holding = self.catalog.modify_holding( - holding, new_label, new_tag - ) + self.catalog.save() except CatalogError as e: # failed to get the holdings - send a return message saying so self.log(e.message, self.RK_LOG_ERROR) body[self.MSG_DETAILS][self.MSG_FAILURE] = e.message body[self.MSG_DATA][self.MSG_HOLDING_LIST] = [] else: - # fill the return message with a dictionary of the holding - ret_dict = { - "id": holding.id, - "user": holding.user, - "group": holding.group, - "old_meta" : old_meta, - "new_meta" : { - "label": holding.label, - "tags": holding.tags - } - } - body[self.MSG_DATA][self.MSG_HOLDING_LIST] = [ret_dict] + # fill the return message with a dictionary of the holding(s) + body[self.MSG_DATA][self.MSG_HOLDING_LIST] = ret_list self.log( - f"Modified metadata from CATALOG_META {ret_dict}", + f"Modified metadata from CATALOG_META {ret_list}", self.RK_LOG_DEBUG ) - self.catalog.save() self.catalog.end_session() # return message to complete RPC diff --git a/nlds_processors/index.py b/nlds_processors/index.py index be01416e..83db31ec 100644 --- a/nlds_processors/index.py +++ b/nlds_processors/index.py @@ -170,7 +170,7 @@ def index(self, raw_filelist: List[NamedTuple], rk_origin: str, # If any items has exceeded the maximum number of retries we add it # to the dead-end failed list - if path_details.retries > self.max_retries: + if path_details.retries.count > self.max_retries: # Append to failed list (in self) and send back to exchange if # the appropriate size. self.log(f"{path_details.path} has exceeded max retry count, " @@ -194,9 +194,7 @@ def index(self, raw_filelist: List[NamedTuple], rk_origin: str, # Increment retry counter and add to retry list reason = (f"Path:{path_details.path} is inaccessible.") self.log(reason, self.RK_LOG_DEBUG) - path_details.increment_retry( - retry_reason=reason - ) + path_details.retries.increment(reason=reason) self.append_and_send( path_details, rk_retry, body_json, list_type="retry" ) @@ -253,9 +251,7 @@ def index(self, raw_filelist: List[NamedTuple], rk_origin: str, f"Path:{walk_path_details.path} is inaccessible." ) self.log(reason, self.RK_LOG_DEBUG) - walk_path_details.increment_retry( - retry_reason=reason - ) + walk_path_details.retries.increment(reason=reason) self.append_and_send( walk_path_details, rk_retry, body_json, list_type="retry" @@ -277,9 +273,7 @@ def index(self, raw_filelist: List[NamedTuple], rk_origin: str, else: reason = f"Path:{path_details.path} is of unknown type." self.log(reason, self.RK_LOG_DEBUG) - path_details.increment_retry( - retry_reason=reason - ) + path_details.retries.increment(reason=reason) self.append_and_send( path_details, rk_retry, body_json, list_type="retry" ) diff --git a/nlds_processors/monitor/monitor.py b/nlds_processors/monitor/monitor.py index c26b0f70..1aa5bad9 100644 --- a/nlds_processors/monitor/monitor.py +++ b/nlds_processors/monitor/monitor.py @@ -1,9 +1,11 @@ +from typing import List + from sqlalchemy import create_engine, func, Enum from sqlalchemy.exc import ArgumentError, IntegrityError, OperationalError from sqlalchemy.orm import Session from nlds_processors.monitor.monitor_models import MonitorBase, TransactionRecord -from nlds_processors.monitor.monitor_models import SubRecord, FailedFile +from nlds_processors.monitor.monitor_models import SubRecord, FailedFile, Warning from nlds.rabbit.consumer import State from nlds.details import PathDetails @@ -136,11 +138,26 @@ def create_sub_record(self, def create_failed_file(self, sub_record: SubRecord, - path_details: PathDetails) -> FailedFile: + path_details: PathDetails, + reason: str = None) -> FailedFile: + """Creates a FailedFile object for the monitoring database. Requires the + input of the parent SubRecord and the PathDetails object of the failed + file in question. Optionally requires a reason str, which will otherwise + be attempted to be taken from the PathDetails object. If no reason can + be found then a MonitorError will be raised. + """ + if reason is None: + if len(path_details.retries.reasons) <= 0: + raise MonitorError( + f"FailedFile for sub_record_id:{sub_record.id} could not be " + "added to the database as no failure reason was supplied. " + ) + else: + reason = path_details.retries.reasons[-1] try: failed_file = FailedFile( filepath=path_details.original_path, - reason=path_details.retry_reasons[-1], + reason=reason, sub_record_id=sub_record.id, ) self.session.add(failed_file) @@ -271,4 +288,24 @@ def check_completion(self, except IntegrityError: raise MonitorError( "IntegrityError raised when attempting to get sub_records" - ) \ No newline at end of file + ) + + + def create_warning(self, + transaction_record: TransactionRecord, + warning: str) -> Warning: + """Create a warning and add it to the TransactionRecord""" + assert(self.session != None) + try: + warning = Warning( + warning = warning, + transaction_record_id = transaction_record.id + ) + self.session.add(warning) + self.session.flush() + except (IntegrityError, KeyError): + raise MonitorError( + f"Warning for transaction_record:{transaction_record.id} could " + "not be added to the database" + ) + return warning \ No newline at end of file diff --git a/nlds_processors/monitor/monitor_models.py b/nlds_processors/monitor/monitor_models.py index 9f73b9a5..4965b01e 100644 --- a/nlds_processors/monitor/monitor_models.py +++ b/nlds_processors/monitor/monitor_models.py @@ -28,6 +28,13 @@ class TransactionRecord(MonitorBase): creation_time = Column(DateTime, default=func.now()) # relationship for SubRecords (One to many) sub_records = relationship("SubRecord") + # relationship for Warnings (One to many) + warnings = relationship("Warning", cascade="delete, delete-orphan") + def get_warnings(self): + warnings = [] + for w in self.warnings: + warnings.append(w.warning) + return warnings class SubRecord(MonitorBase): @@ -76,6 +83,15 @@ class FailedFile(MonitorBase): sub_record_id = Column(Integer, ForeignKey("sub_record.id"), index=True, nullable=False) +class Warning(MonitorBase): + __tablename__ = "warning" + + # just two columns - primary key and warning string + id = Column(Integer, primary_key=True) + warning = Column(String) + # link to transaction record warning about + transaction_record_id = Column(Integer, ForeignKey("transaction_record.id"), + index=True, nullable=False) def orm_to_dict(obj): retdict = obj.__dict__ diff --git a/nlds_processors/monitor/monitor_worker.py b/nlds_processors/monitor/monitor_worker.py index 7b9aa6c5..b42e6e93 100644 --- a/nlds_processors/monitor/monitor_worker.py +++ b/nlds_processors/monitor/monitor_worker.py @@ -33,6 +33,7 @@ from nlds.rabbit.consumer import RabbitMQConsumer as RMQC from nlds.rabbit.consumer import State +from nlds.details import Retries from nlds_processors.monitor.monitor import Monitor, MonitorError from nlds_processors.monitor.monitor_models import orm_to_dict from nlds_processors.db_mixin import DBError @@ -146,6 +147,22 @@ def _monitor_put(self, body: Dict[str, str]) -> None: except KeyError: self.log("No retry_fl found in message, assuming false.", self.RK_LOG_DEBUG) + + # Get the transaction-level retry + try: + trans_retries = Retries.from_dict(body) + except KeyError: + self.log("No retries found in message, continuing with an empty ", + "Retries object.", self.RK_LOG_DEBUG) + trans_retries = Retries() + + # get the warning(s) from the details section of the message + try: + warnings = body[self.MSG_DETAILS][self.MSG_WARNING] + except KeyError: + self.log("No warning found in message, continuing without", + self.RK_LOG_DEBUG) + warnings = [] # start the database transactions self.monitor.start_session() @@ -153,7 +170,7 @@ def _monitor_put(self, body: Dict[str, str]) -> None: # For any given monitoring update, we need to: # - find the transaction record (create if not present) # - update the subrecord(s) associated with it - # - find an exisiting + # - find an existing # - see if it matches sub_id in message # - update it if it does # - change state @@ -176,12 +193,17 @@ def _monitor_put(self, body: Dict[str, str]) -> None: try: trec = self.monitor.create_transaction_record( user, group, transaction_id, job_label, api_action - ) + ) except MonitorError as e: self.log(e.message, RMQC.RK_LOG_ERROR) else: trec = trec[0] + # create any warnings if there are any + if warnings and len(warnings) > 0: + for w in warnings: + warning = self.monitor.create_warning(trec, w) + try: srec = self.monitor.get_sub_record(sub_id) except MonitorError: @@ -214,8 +236,15 @@ def _monitor_put(self, body: Dict[str, str]) -> None: # Create failed_files if necessary if state == State.FAILED: try: - for path_details in filelist: - self.monitor.create_failed_file(srec, path_details) + # Passing reason as None to create_failed_file will default to + # to the last reason in the PathDetails object retries section. + reason = None + for pd in filelist: + # Check which was the final reason for failure and store + # that as the failure reason for the FailedFile. + if len(trans_retries.reasons) > len(pd.retries.reasons): + reason = trans_retries.reasons[-1] + self.monitor.create_failed_file(srec, pd, reason=reason) except MonitorError as e: self.log(e, self.RK_LOG_ERROR) @@ -346,7 +375,7 @@ def _monitor_get(self, body: Dict[str, str], properties: Header) -> None: # get the desired retry_count from the DETAILS section of the message try: - retry_count = int(body[self.MSG_DETAILS][self.MSG_RETRY_COUNT]) + retry_count = int(body[self.MSG_DETAILS][self.MSG_RETRY_COUNT_QUERY]) except (KeyError, TypeError): self.log("Transaction sub-id not in message, continuing without.", self.RK_LOG_INFO) @@ -383,6 +412,7 @@ def _monitor_get(self, body: Dict[str, str], properties: Header) -> None: "job_label": tr.job_label, "api_action": tr.api_action, "creation_time": tr.creation_time.isoformat(), + "warnings": tr.get_warnings(), "sub_records" : [] } trecs_dict[tr.id] = t_rec diff --git a/nlds_processors/nlds_worker.py b/nlds_processors/nlds_worker.py index c7db4805..c0a7e9df 100644 --- a/nlds_processors/nlds_worker.py +++ b/nlds_processors/nlds_worker.py @@ -112,7 +112,8 @@ def _process_rk_list(self, body_json: dict) -> None: self.log(f"Sending message to {queue} queue with routing " f"key {new_routing_key}", self.RK_LOG_INFO) self.publish_and_log_message(new_routing_key, body_json) - + + def _process_rk_index_complete(self, body_json: dict) -> None: # forward to catalog-put on the catalog_q self.log(f"Index successful, sending file list for cataloguing.", @@ -125,6 +126,7 @@ def _process_rk_index_complete(self, body_json: dict) -> None: f"key {new_routing_key}", self.RK_LOG_INFO) self.publish_and_log_message(new_routing_key, body_json) + def _process_rk_catalog_put_complete(self, body_json: dict) -> None: self.log(f"Catalog successful, sending filelist for transfer", self.RK_LOG_INFO) @@ -137,12 +139,14 @@ def _process_rk_catalog_put_complete(self, body_json: dict) -> None: f"key {new_routing_key}", self.RK_LOG_INFO) self.publish_and_log_message(new_routing_key, body_json) + def _process_rk_transfer_put_complete(self, body_json: dict) -> None: # Nothing happens after a successful transfer anymore, so we leave this # empty in case any future messages are required (queueing archive for # example) pass + def _process_rk_transfer_put_failed(self, body_json: dict) -> None: self.log(f"Transfer unsuccessful, sending failed files back to catalog " "for deletion", @@ -217,6 +221,7 @@ def callback(self, ch: Channel, method: Method, properties: Header, self.log(f"Worker callback complete!", self.RK_LOG_INFO) + def publish_and_log_message(self, routing_key: str, msg: dict, log_fl=True) -> None: """ diff --git a/nlds_processors/transferers/get_transfer.py b/nlds_processors/transferers/get_transfer.py index fe9a169b..9d54acca 100644 --- a/nlds_processors/transferers/get_transfer.py +++ b/nlds_processors/transferers/get_transfer.py @@ -55,7 +55,7 @@ def transfer(self, transaction_id: str, tenancy: str, access_key: str, rk_failed = ".".join([rk_origin, self.RK_TRANSFER_GET, self.RK_FAILED]) for path_details in filelist: - if path_details.retries > self.max_retries: + if path_details.retries.count > self.max_retries: self.append_and_send( path_details, rk_failed, body_json, list_type="failed" ) @@ -71,9 +71,7 @@ def transfer(self, transaction_id: str, tenancy: str, access_key: str, self.log(f"{reason}, adding " f"{path_details.object_name} to retry list.", self.RK_LOG_INFO) - path_details.increment_retry( - retry_reason=reason - ) + path_details.retries.increment(reason=reason) self.append_and_send( path_details, rk_failed, body_json, list_type="retry" ) @@ -85,9 +83,7 @@ def transfer(self, transaction_id: str, tenancy: str, access_key: str, "buckets") self.log(f"{reason}. Adding {object_name} to retry list.", self.RK_LOG_ERROR) - path_details.increment_retry( - retry_reason=reason - ) + path_details.retries.increment(reason=reason) self.append_and_send( path_details, rk_failed, body_json, list_type="retry" ) @@ -110,9 +106,7 @@ def transfer(self, transaction_id: str, tenancy: str, access_key: str, "path is inaccessible.") self.log(f"{reason}. Adding to retry-list.", self.RK_LOG_INFO) - path_details.increment_retry( - retry_reason=reason - ) + path_details.retries.increment(reason=reason) self.append_and_send(path_details, rk_retry, body_json, list_type=FilelistType.retry) continue @@ -139,9 +133,7 @@ def transfer(self, transaction_id: str, tenancy: str, access_key: str, self.log(reason, self.RK_LOG_DEBUG) self.log(f"Exception encountered during download, adding " f"{object_name} to retry-list.", self.RK_LOG_INFO) - path_details.increment_retry( - retry_reason=reason - ) + path_details.retries.increment(reason=reason) self.append_and_send(path_details, rk_retry, body_json, list_type=FilelistType.retry) continue diff --git a/nlds_processors/transferers/put_transfer.py b/nlds_processors/transferers/put_transfer.py index dff1722f..506910dc 100644 --- a/nlds_processors/transferers/put_transfer.py +++ b/nlds_processors/transferers/put_transfer.py @@ -50,7 +50,7 @@ def transfer(self, transaction_id: str, tenancy: str, access_key: str, item_path = path_details.path # First check whether index item has failed too many times - if path_details.retries > self.max_retries: + if path_details.retries.count > self.max_retries: self.append_and_send( path_details, rk_failed, body_json, list_type="failed" ) @@ -62,9 +62,7 @@ def transfer(self, transaction_id: str, tenancy: str, access_key: str, not self.check_path_access(item_path)): reason = (f"Path:{path_details.path} is inaccessible.") self.log(reason, self.RK_LOG_DEBUG) - path_details.increment_retry( - retry_reason=reason - ) + path_details.retries.increment(reason=reason) self.append_and_send( path_details, rk_retry, body_json, list_type="retry" ) @@ -97,7 +95,7 @@ def transfer(self, transaction_id: str, tenancy: str, access_key: str, reason = (f"Error uploading {path_details.path} to object " f"store: {e}.") self.log(f"{reason} Adding to retry list.", self.RK_LOG_ERROR) - path_details.increment_retry(retry_reason=reason) + path_details.retries.increment(reason=reason) self.append_and_send( path_details, rk_retry, body_json, list_type="retry" ) diff --git a/test_run/test_run.rc b/test_run/test_run.rc index a2d9455f..fde73f78 100644 --- a/test_run/test_run.rc +++ b/test_run/test_run.rc @@ -35,8 +35,9 @@ focus # right pos 4 # create the logger -screen -t "logger" -exec "$PYTHON_DIR/python" "$NLDS_PROC/logger.py" +# run the server via uvicorn +screen -t "server" +exec "$PYTHON_DIR/uvicorn" "nlds.main:nlds" "--reload" "--log-level=trace" "--port=8000" focus left split focus @@ -56,6 +57,6 @@ split focus # left pos 4 -# run the server via uvicorn -screen -t "server" -exec "$PYTHON_DIR/uvicorn" "nlds.main:nlds" "--reload" "--log-level=trace" "--port=8000" +screen -t "logger" +exec "$PYTHON_DIR/python" "$NLDS_PROC/logger.py" + diff --git a/test_run/test_run_start.sh b/test_run/test_run_start.sh deleted file mode 100755 index ac770269..00000000 --- a/test_run/test_run_start.sh +++ /dev/null @@ -1,9 +0,0 @@ -#! /usr/bin/env bash -if [[ ! -d ~/nlds_log ]] -then - mkdir ~/nlds_log -fi - -source $HOME/python-venvs/nlds-venv/bin/activate -# start a named screen session -screen -S nlds -c test_run.rc diff --git a/test_run/test_run_stop.sh b/test_run/test_run_stop.sh deleted file mode 100755 index 0c7f11c8..00000000 --- a/test_run/test_run_stop.sh +++ /dev/null @@ -1,3 +0,0 @@ -#! /usr/bin/env bash -screen -d nlds -screen -r nlds -X quit diff --git a/tests/conftest.py b/tests/conftest.py index 969f6856..4ded5987 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,7 @@ import pytest from nlds.rabbit.publisher import RabbitMQPublisher as RMQP -from nlds.details import PathDetails +from nlds.details import PathDetails, Retries TEMPLATE_CONFIG_PATH = os.path.join(os.path.dirname(__file__), @@ -52,6 +52,7 @@ def default_rmq_body(test_uuid): # Convert to PathDetails for JSON serialisation RMQP.MSG_FILELIST: [PathDetails(original_path="item_path"),], }, + **Retries().to_dict(), RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD } return json.dumps(msg_dict) diff --git a/tests/nlds/test_details.py b/tests/nlds/test_details.py index f66318e8..0e4648aa 100644 --- a/tests/nlds/test_details.py +++ b/tests/nlds/test_details.py @@ -3,9 +3,84 @@ import pytest -from nlds.details import PathDetails +from nlds.details import PathDetails, Retries from nlds.utils.permissions import check_permissions +def test_retries(): + retries = Retries() + + # Check that incrementing without a reason works + assert retries.count == 0 + assert len(retries.reasons) == 0 + retries.increment() + assert retries.count == 1 + assert len(retries.reasons) == 0 + + # Check that reset actually resets the count + retries.reset() + assert retries.count == 0 + assert len(retries.reasons) == 0 + + # Try incrementing with a reason + retries.increment(reason='Test retry') + assert retries.count == 1 + assert len(retries.reasons) == 1 + + # Try incrementing with another reason + retries.increment(reason='Different test reason') + assert retries.count == 2 + assert len(retries.reasons) == 2 + + # Check that reset does indeed work for a list of 2 reasons + retries.reset() + assert retries.count == 0 + assert len(retries.reasons) == 0 + + # See what happens if we try a non-string reason + retries.increment(reason=1) + assert retries.count == 1 + assert len(retries.reasons) == 1 + + # A None should be interpreted as 'not a reason' so shouldn't add to the + # reasons list + retries.increment(reason=None) + assert retries.count == 2 + assert len(retries.reasons) == 1 + + # A 0 should probably count as a reason, but currently doesn't + retries.increment(reason=0) + assert retries.count == 3 + assert len(retries.reasons) == 1 + + # Reset for testing dictionary conversion? Not necessary + # retries.reset() + # assert retries.count == 0 + # assert len(retries.reasons) == 0 + + # Convert to dict and check integrity of output. + # Should be in an outer dict called 'retries' + r_dict = retries.to_dict() + assert 'retries' in r_dict + + # Should contain a count and a reasons list + assert 'count' in r_dict['retries'] + assert isinstance(r_dict['retries']['count'], int) + + assert 'reasons' in r_dict['retries'] + assert isinstance(r_dict['retries']['reasons'], list) + + # Attempt to make a Retries object from the dictionary + new_retries = Retries.from_dict(r_dict) + assert new_retries.count == 3 + assert len(new_retries.reasons) == 1 + + # Attempt alternative constructor usage + alt_retries = Retries(**r_dict['retries']) + assert alt_retries.count == 3 + assert len(alt_retries.reasons) == 1 + + assert new_retries == alt_retries + def test_path_details(): # Attempt to make a path details object pd = PathDetails(original_path=__file__) @@ -30,9 +105,14 @@ def test_path_details(): # Test that creation from a stat_result is the same as the original object stat_result = Path(__file__).lstat() - pd_from_stat = PathDetails.from_stat(__file__, stat_result=stat_result) + pd_from_stat = PathDetails.from_stat_result(__file__, stat_result=stat_result) assert pd_from_stat == pd + # Similarly check that the from_path method creates an equivalent + # path_details object + pd_from_path = PathDetails.from_path(__file__) + assert pd_from_path == pd + # Check the approximated stat_result from the get_stat_result() method sr_from_pd = pd.get_stat_result() assert sr_from_pd.st_mode == stat_result.st_mode @@ -46,3 +126,12 @@ def test_path_details(): check_permissions(20, [100, ], path=__file__) == check_permissions(20, [100, ], stat_result=sr_from_pd) ) + + # Check that from_dict() and to_json() work + pd_json = pd.to_json() + pd_from_json = PathDetails.from_dict(pd_json) + assert pd == pd_from_json + + # Check contents of json? + assert "file_details" in pd_json + assert "retries" in pd_json \ No newline at end of file diff --git a/tests/nlds_processors/catalog/test_catalog.py b/tests/nlds_processors/catalog/test_catalog.py new file mode 100644 index 00000000..5d3b9384 --- /dev/null +++ b/tests/nlds_processors/catalog/test_catalog.py @@ -0,0 +1,574 @@ +import uuid +import time + +import pytest +from sqlalchemy import func + +from nlds_processors.catalog.catalog_models import ( + CatalogBase, File, Holding, Location, Transaction, Storage, Checksum, Tag +) +from nlds_processors.catalog.catalog import Catalog, CatalogError +from nlds.details import PathType + +test_uuid = '00a246cf-e2a8-46f0-baca-be3972fc4034' + +@pytest.fixture() +def mock_catalog(): + # Manually set some settings for a separate test db + db_engine = "sqlite" + db_options = { + "db_name" : "", + "db_user" : "", + "db_passwd" : "", + "echo": False + } + catalog = Catalog(db_engine, db_options) + catalog.connect() + catalog.start_session() + yield catalog + catalog.save() + catalog.end_session() + +@pytest.fixture() +def mock_holding(): + return Holding( + label='test-label', + user='test-user', + group='test-group', + ) + +@pytest.fixture() +def mock_transaction(): + return Transaction( + holding_id=None, + transaction_id=test_uuid, + ingest_time=func.now(), + ) + +@pytest.fixture() +def mock_file(): + new_file = File( + transaction_id=None, + original_path='/test/path', + path_type=PathType['FILE'], + link_path = None, + size=1050, + user='test-user', + group='test-group', + file_permissions='0o01577' + ) + return new_file + +@pytest.fixture() +def filled_mock_catalog(mock_catalog, mock_holding, mock_transaction): + mock_catalog.session.add(mock_holding) + mock_catalog.session.commit() + + mock_catalog.session.add(mock_transaction) + mock_catalog.session.commit() + + yield mock_catalog + + +class TestCatalog: + + def test_get_holding(self, mock_catalog): + # try on an empty database, should fail + with pytest.raises(CatalogError): + holding = mock_catalog.get_holding(user='', group='') + with pytest.raises(CatalogError): + holding = mock_catalog.get_holding(user='asgasg', group='assag') + + # First need to add a valid holding to the db + valid_holding = Holding( + label='test-label', + user='test-user', + group='test-group', + ) + mock_catalog.session.add(valid_holding) + mock_catalog.session.commit() + + # Attempt to get the valid holding from the test db + # Should work with just the user and group + holding = mock_catalog.get_holding(user='test-user', + group='test-group') + assert len(holding) == 1 + + # Should similarly work with correct label or regex search for label + holding = mock_catalog.get_holding(user='test-user', group='test-group', + label='test-label') + holding = mock_catalog.get_holding(user='test-user', group='test-group', + label='.*') + + # Try with incorrect information, should fail + with pytest.raises(CatalogError): + holding = mock_catalog.get_holding(user='incorrect-user', + group='test-group') + with pytest.raises(CatalogError): + holding = mock_catalog.get_holding(user='test-user', + group='incorrect-group') + with pytest.raises(CatalogError): + holding = mock_catalog.get_holding(user='incorrect-user', + group='incorrect-group') + with pytest.raises(CatalogError): + holding = mock_catalog.get_holding(user='test-user', + group='test-group', + label='incorrect-label') + # Should also fail with incorrect regex in label + with pytest.raises(CatalogError): + holding = mock_catalog.get_holding(user='test-user', + group='test-group', + label='*') + + # We can now add another Holding with the same user and group, but a + # different label and attempt to get that. + # First need to add a valid holding to the db + valid_holding = Holding( + label='test-label-2', + user='test-user', + group='test-group', + ) + mock_catalog.session.add(valid_holding) + mock_catalog.session.commit() + + # Should work with correct label + holding = mock_catalog.get_holding(user='test-user', group='test-group', + label='test-label') + # NOTE: returns 2 because test-label appears in both labels and this is + # automatically getting regexed + assert len(holding) == 2 + # Have to use proper regex to just get 1 + holding = mock_catalog.get_holding(user='test-user', group='test-group', + label='test-label$') + assert len(holding) == 1 + + # And with no label specified? + holding = mock_catalog.get_holding(user='test-user', group='test-group') + assert len(holding) == 2 + + # And with regex + holding = mock_catalog.get_holding(user='test-user', group='test-group', + label='test-label') + assert len(holding) == 2 + + # Can try getting by other parameters like holding_id and tag + holding = mock_catalog.get_holding(user='test-user', group='test-group', + holding_id=1) + assert len(holding) == 1 + holding = mock_catalog.get_holding(user='test-user', group='test-group', + holding_id=2) + assert len(holding) == 1 + + with pytest.raises(CatalogError): + mock_catalog.get_holding(user='test-user', group='test-group', + holding_id=3) + # TODO: tag logic? + + + def test_create_holding(self, mock_catalog): + # Can attempt to create a holding and then get it + mock_catalog.create_holding(user='test-user', group='test-group', + label='test-label') + # NOTE: Would be wise to directly interface with the database here to + # make sure the unit-test is isolated? + holding = mock_catalog.session.query(Holding).filter( + Holding.user == 'test-user', + Holding.group == 'test-group', + Holding.label == 'test-label', + ).all() + assert len(holding) == 1 + + # Attempt to add another identical item, should fail + with pytest.raises(CatalogError): + mock_catalog.create_holding(user='test-user', group='test-group', + label='test-label') + # need to rollback as the session is otherwise stuck + mock_catalog.session.rollback() + + + def test_modify_holding_with_empty_db(self, mock_catalog): + # modify_holding requires passing a Holding object to be passed in, so + # here we'll try with a None instead. Attempting to modify an invalid + # holding, should result in a CatalogError + with pytest.raises(CatalogError): + holding = mock_catalog.modify_holding(None, new_label='new-label') + # NOTE: the following fail with AttributeErrors as they're not properly + # input filtering, might be good to be more robust here and make these + # return catalog errors? + with pytest.raises(CatalogError): + holding = mock_catalog.modify_holding(None, new_tags={'key': 'val'}) + with pytest.raises(CatalogError): + holding = mock_catalog.modify_holding(None, del_tags={'key': 'val'}) + + + def test_modify_holding(self, mock_catalog): + # create a holding to modify + valid_holding = Holding( + label='test-label', + user='test-user', + group='test-group', + ) + # Before it's committed to the database, see if we can modify the label + holding = mock_catalog.modify_holding(valid_holding, + new_label='new-label') + assert holding.label == 'new-label' + # Commit and then attempt to modify again + mock_catalog.session.add(valid_holding) + mock_catalog.session.commit() + + holding = mock_catalog.modify_holding(valid_holding, + new_label='new-label-2') + assert holding.label == 'new-label-2' + # Attempting to change or delete the tags of a tagless holding should + # create new tags + valid_holding = mock_catalog.modify_holding(valid_holding, + new_tags={'key': 'val'}) + assert len(valid_holding.tags) == 1 + # Get the tags as a dict + tags = valid_holding.get_tags() + assert 'key' in tags + assert tags['key'] == 'val' + + # Can now modify the existing tag and check it's worked + valid_holding = mock_catalog.modify_holding(valid_holding, + new_tags={'key': 'newval'}) + assert len(valid_holding.tags) == 1 + # Get the tags as a dict + tags = valid_holding.get_tags() + assert 'key' in tags + assert tags['key'] == 'newval' + + # Can now attempt to remove the tags. + # Removing the tag that exists but has a different value should work but + # not do anything + valid_holding = mock_catalog.modify_holding(valid_holding, + del_tags={'key': 'val'}) + assert len(valid_holding.tags) == 1 + # Need to commit for changes to take effect + mock_catalog.session.commit() + assert len(valid_holding.tags) == 1 + + # Removing the actual tag should work + valid_holding = mock_catalog.modify_holding(valid_holding, + del_tags={'key': 'newval'}) + assert len(valid_holding.tags) == 1 + # Need to commit for changes to take effect + mock_catalog.session.commit() + assert len(valid_holding.tags) == 0 + + # Deleting a tag that doesn't exist shouldn't work + with pytest.raises(CatalogError): + valid_holding = mock_catalog.modify_holding(valid_holding, + del_tags={'key': 'val'}) + assert len(valid_holding.tags) == 0 + # Deleting or modifying without a valid dict should also break. Unlikely + # to occur in practice and raises a Type error which would be caught by + # the consumer. + with pytest.raises(TypeError): + valid_holding = mock_catalog.modify_holding(valid_holding, + new_tags='String') + # TODO: interestingly this raises a CatalogError, as opposed to a + # TypeError, should probably standardise this. + with pytest.raises(CatalogError): + valid_holding = mock_catalog.modify_holding(valid_holding, + del_tags='String') + + # Finally, attempting to do all of the actions at once should work the + # same as individually + holding = mock_catalog.modify_holding(valid_holding, + new_label='new-label-3', + new_tags={'key-3': 'val-3'}, + del_tags={'key-3': 'val-3'}) + assert holding.label == 'new-label-3' + # Interestingly, the all-at-once behaviour is different from the one-at- + # a time behaviour: the tag is deleted without the need for a commit. + # TODO: should look into this! + assert len(holding.tags) == 0 + + + def test_get_transaction(self, mock_catalog): + test_uuid = str(uuid.uuid4()) + + # try on an empty database, should probably fail but currently doesn't + # because of the one_or_none() on the query in the function. + transaction = mock_catalog.get_transaction(id=1) + assert transaction is None + transaction = mock_catalog.get_transaction(transaction_id=test_uuid) + assert transaction is None + + # add a transaction to later get + transaction = Transaction( + holding_id = 2, # doesn't exist yet + transaction_id = test_uuid, + ) + mock_catalog.session.add(transaction) + mock_catalog.session.commit() + + # Should be able to get by id or transaction_id + g_transaction = mock_catalog.get_transaction(id=1) + assert transaction == g_transaction + assert transaction.id == g_transaction.id + assert transaction.transaction_id == g_transaction.transaction_id + g_transaction = mock_catalog.get_transaction(transaction_id=test_uuid) + assert transaction == g_transaction + assert transaction.id == g_transaction.id + assert transaction.transaction_id == g_transaction.transaction_id + + # Try getting a non-existent transaction now we have something in the db + other_uuid = str(uuid.uuid4) + g_transaction = mock_catalog.get_transaction(transaction_id=other_uuid) + assert g_transaction is None + + + def test_create_transaction(self, mock_catalog, mock_holding): + test_uuid = str(uuid.uuid4()) + test_uuid_2 = str(uuid.uuid4()) + + # Attempt to create a new transaction, technically requires a Holding. + # First try with no holding, fails when it tries to reference the + # holding + with pytest.raises(AttributeError): + transaction = mock_catalog.create_transaction( + holding=None, + transaction_id=test_uuid + ) + + # create a holding to create the transaction with + holding = mock_holding + mock_catalog.session.add(holding) + mock_catalog.session.commit() + + # Can now make as many transactions as we want, assuming different uuids + transaction = mock_catalog.create_transaction(holding, test_uuid) + transaction_2 = mock_catalog.create_transaction(holding, test_uuid_2) + assert transaction != transaction_2 + + # Using the same uuid should probbaly result in an error but currently + # doesn't + # with pytest.raises(CatalogError): + transaction_3 = mock_catalog.create_transaction(holding, test_uuid) + + + def test_user_has_get_holding_permission(self): + # Leaving this for now until it's a bit more fleshed out + pass + + def test_user_has_get_file_permission(self): + # Leaving this for now until it's a bit more fleshed out + pass + + def test_get_file(self, mock_catalog, mock_holding, mock_transaction, + mock_file): + test_uuid = str(uuid.uuid4()) + catalog = mock_catalog + + # Add mock holding and transaction to db so ids are populated. + catalog.session.add(mock_holding) + catalog.session.flush() + mock_transaction.holding_id = mock_holding.id + catalog.session.add(mock_transaction) + catalog.session.flush() + + # Getting shouldn't work on an empty database + with pytest.raises(CatalogError): + file_ = catalog.get_file('test-user', 'test-group', '/test/path') + # Similarly shouldn't work if we specify a holding + with pytest.raises(CatalogError): + file_ = catalog.get_file('test-user', 'test-group', '/test/path', + holding=mock_holding) + + # But it will return without a fault if the flag is provided. + file_ = catalog.get_file('test-user', 'test-group', '/test/path', + missing_error_fl=False) + assert file_ is None + + # And, similarly, should return None if we do the same for a specific + # holding + file_ = catalog.get_file('test-user', 'test-group', '/test/path', + holding=mock_holding, missing_error_fl=False) + assert file_ is None + + # Make a file for us to get + new_file = mock_file + new_file.transaction_id = mock_transaction.id + catalog.session.add(new_file) + catalog.session.commit() + + # Should now work + file_ = catalog.get_file('test-user', 'test-group', '/test/path') + # Should still work if we provide a holding + same_file = catalog.get_file('test-user', 'test-group', '/test/path', + mock_holding) + assert file_ == same_file + + # Now we can add another holding, transaction and file with the same + # path to test which one is provided + holding_2 = Holding( + label='test-label-2', + user='test-user', + group='test-group', + ) + catalog.session.add(holding_2) + catalog.session.flush() + + # Sleep so that the ingest time is different t + time.sleep(1) + transaction_2 = Transaction( + holding_id=holding_2.id, + transaction_id=test_uuid, + ingest_time=func.now(), + ) + catalog.session.add(transaction_2) + catalog.session.flush() + file_2 = File( + transaction_id=transaction_2.id, + original_path='/test/path', + path_type=PathType['FILE'], + link_path = None, + size=99999, + user='test-user', + group='test-group', + file_permissions='0o01577' + ) + catalog.session.add(file_2) + catalog.session.commit() + + # Verify that the ingest times are different else the test won't work + assert transaction_2.ingest_time > mock_transaction.ingest_time + + # Should now get the most recently added file (with the large size) + g_file = catalog.get_file('test-user', 'test-group', '/test/path') + assert g_file.size == 99999 + + # Should still be able to get the first file if we provide a holding + first_file = catalog.get_file('test-user', 'test-group', '/test/path', + mock_holding) + assert first_file.size == 1050 + + # An invalid path should raise an error too + with pytest.raises(CatalogError): + files = catalog.get_file('test-user', 'test-group', 'invalidpath') + + + def test_get_files(self, mock_catalog, mock_holding, mock_transaction, + mock_file): + test_uuid = str(uuid.uuid4()) + catalog = mock_catalog + + # Getting shouldn't work on an empty database + with pytest.raises(CatalogError): + files = catalog.get_files('test-user', 'test-group') + # Try with garbage input + with pytest.raises(CatalogError): + files = catalog.get_files('ihasidg', 'oihaosifh') + # Try with reasonable values in all optional kwargs + with pytest.raises(CatalogError): + files = catalog.get_files( + 'test-user', + 'test-group', + holding_label='test-label', + holding_id=1, + transaction_id=test_uuid, + path='/test/path', + tag={'key': 'val'}, + ) + # Try with garbage in all optional kwargs + with pytest.raises(CatalogError): + files = catalog.get_files( + 'asgad', + 'agdasd', + holding_label='ououg', + holding_id='asfasf', + transaction_id='adgouihoih', + path='oihosidhag', + tag={'aegaa': 'as'}, + ) + + # Add mock holding and transaction to db so ids are populated. + catalog.session.add(mock_holding) + catalog.session.flush() + mock_transaction.holding_id = mock_holding.id + catalog.session.add(mock_transaction) + catalog.session.flush() + + # Getting still shouldn't work on the now initialised database + with pytest.raises(CatalogError): + files = catalog.get_files('test-user', 'test-group') + # Try with garbage input + with pytest.raises(CatalogError): + files = catalog.get_files('ihasidg', 'oihaosifh') + # Try with reasonable values in all optional kwargs + with pytest.raises(CatalogError): + files = catalog.get_files( + 'test-user', + 'test-group', + holding_label='test-label', + holding_id=1, + transaction_id=test_uuid, + path='/test/path', + tag={'key': 'val'}, + ) + # Try with garbage in all optional kwargs + with pytest.raises(CatalogError): + files = catalog.get_files( + 'asgad', + 'agdasd', + holding_label='ououg', + holding_id='asfasf', + transaction_id='adgouihoih', + path='oihosidhag', + tag={'aegaa': 'as'}, + ) + + # Make a file for us to get + new_file = mock_file + new_file.transaction_id = mock_transaction.id + catalog.session.add(new_file) + catalog.session.commit() + + files = catalog.get_files('test-user', 'test-group') + assert isinstance(files, list) + assert len(files) == 1 + assert files[0] == new_file + + # Add some more files with the same user + for i in range(10): + new_file_2 = File( + transaction_id=mock_transaction.id, + original_path=f'/test/path-{i}', + path_type=PathType['FILE'], + link_path = None, + size=1050, + user='test-user', + group='test-group', + file_permissions='0o01577' + ) + catalog.session.add(new_file_2) + catalog.session.commit() + files = catalog.get_files('test-user', 'test-group') + assert isinstance(files, list) + assert len(files) == i+2 + + def test_create_file(self): + pass + + def test_delete_files(self): + pass + + def test_get_location(self): + pass + + def test_create_location(self): + pass + + def test_create_tag(self): + pass + + def test_get_tag(self): + pass + + def test_modify_tag(self): + pass + + def test_del_tag(self): + pass diff --git a/tests/nlds_processors/catalog/test_catalog_worker.py b/tests/nlds_processors/catalog/test_catalog_worker.py new file mode 100644 index 00000000..cd79a313 --- /dev/null +++ b/tests/nlds_processors/catalog/test_catalog_worker.py @@ -0,0 +1,20 @@ +import pytest +import functools + +from nlds.rabbit import publisher as publ +import nlds.rabbit.consumer as cons +from nlds.details import PathDetails +from nlds_processors.catalog.catalog_worker import CatalogConsumer + +def mock_load_config(template_config): + return template_config + +@pytest.fixture() +def default_catalog(monkeypatch, template_config): + # Ensure template is loaded instead of .server_config + monkeypatch.setattr(publ, "load_config", functools.partial( + mock_load_config, + template_config + ) + ) + return CatalogConsumer() \ No newline at end of file diff --git a/tests/nlds_processors/monitor/test_monitor.py b/tests/nlds_processors/monitor/test_monitor.py new file mode 100644 index 00000000..3a9234f7 --- /dev/null +++ b/tests/nlds_processors/monitor/test_monitor.py @@ -0,0 +1,56 @@ +import pytest + +from nlds_processors.monitor.monitor_models import ( + MonitorBase, TransactionRecord, SubRecord, FailedFile, Warning +) +from nlds_processors.monitor.monitor import Monitor, MonitorError + +@pytest.fixture() +def mock_monitor(): + # Manually set some settings for test db in memory, very basic. + db_engine = "sqlite" + db_options = { + "db_name" : "", + "db_user" : "", + "db_passwd" : "", + "echo": False + } + # Set up + monitor = Monitor(db_engine, db_options) + monitor.connect() + monitor.start_session() + + # Provide to method + yield monitor + + # Tear down + monitor.save() + monitor.end_session() + + +def test_create_transaction_record(mock_monitor): + pass + +def test_get_transaction_record(mock_monitor): + pass + +def test_create_sub_record(mock_monitor): + pass + +def test_get_sub_record(mock_monitor): + pass + +def test_get_sub_records(mock_monitor): + pass + +def test_update_sub_record(mock_monitor): + pass + +def test_create_failed_file(mock_monitor): + pass + +def test_check_completion(mock_monitor): + pass + +def test_create_warning(mock_monitor): + pass diff --git a/tests/nlds_processors/test_db_mixin.py b/tests/nlds_processors/test_db_mixin.py new file mode 100644 index 00000000..f6ff46fc --- /dev/null +++ b/tests/nlds_processors/test_db_mixin.py @@ -0,0 +1,76 @@ +import pytest + +from nlds_processors.db_mixin import DBMixin, DBError + +class MockDBMixinInheritor(DBMixin): + """Mock class for testing DBMixin functions in isolation""" + def __init__(self, db_engine, db_options): + # Create minimum required attributes to create a working inherited class + # of DBMixin + self.db_engine = None + self.db_engine_str = db_engine + self.db_options = db_options + self.sessions = None + +class TestCatalogCreation(): + + def test_connect(self): + # Use non-sensical db_engine + db_engine = "gibberish" + db_options = { + "db_name" : "/test.db", + "db_user" : "", + "db_passwd" : "", + "echo": False + } + mock_dbmixin = MockDBMixinInheritor(db_engine, db_options) + # Should not work as we're trying to use non-sensical db config options + with pytest.raises(DBError): + mock_dbmixin.connect() + + # Use non-sensical db_options + db_engine = "sqlite" + db_options = list() + mock_dbmixin = MockDBMixinInheritor(db_engine, db_options) + # Should not work, but should break when the db_string is made through + # trying to index like a dictionary and then subsequently calling len() + with pytest.raises((KeyError, TypeError)): + mock_dbmixin.connect() + + # What happens if we use an empty dict? + db_options = dict() + mock_dbmixin = MockDBMixinInheritor(db_engine, db_options) + # Should not work when it tries to index things that don't exist + with pytest.raises(KeyError): + mock_dbmixin.connect() + + # Try creating a database with functional parameters (no username or + # password) in memory + db_engine = "sqlite" + db_options = { + "db_name" : "", + "db_user" : "", + "db_passwd" : "", + "echo": False + } + mock_dbmixin = MockDBMixinInheritor(db_engine, db_options) + # Should not work as we've not got a Base to create tables from + with pytest.raises((AttributeError)): + mock_dbmixin.connect() + + # Try creating a local database with username and password + db_engine = "sqlite" + db_options = { + "db_name" : "/test.db", + "db_user" : "test-un", + "db_passwd" : "test-pwd", + "echo": False + } + mock_dbmixin = MockDBMixinInheritor(db_engine, db_options) + # Should not work as we can't create a db file with a password or + # username using pysqlite (the default sqlite engine) + with pytest.raises((DBError)): + mock_dbmixin.connect() + + # TODO: Test with a mock SQLAlchemy.Base and see if we can break it in + # any interesting ways \ No newline at end of file diff --git a/tests/nlds_processors/test_index.py b/tests/nlds_processors/test_index.py index 6cc092b8..087d0288 100644 --- a/tests/nlds_processors/test_index.py +++ b/tests/nlds_processors/test_index.py @@ -6,7 +6,7 @@ from nlds.rabbit import publisher as publ import nlds.rabbit.statting_consumer as scons -from nlds.details import PathDetails +from nlds.details import PathDetails, Retries from nlds_processors.index import IndexerConsumer def mock_load_config(template_config): @@ -76,7 +76,7 @@ def test_index(monkeypatch, caplog, default_indexer, # Should work with any number of retries under the limit for i in range(default_indexer.max_retries): - test_filelist = [PathDetails(original_path="/test/", retries=i)] + test_filelist = [PathDetails(original_path="/test/", retries=Retries(count=i))] default_indexer.index(test_filelist, 'test', default_rmq_message_dict) assert len(default_indexer.completelist) == len(expected_filelist) @@ -90,7 +90,7 @@ def test_index(monkeypatch, caplog, default_indexer, # All files should be in failed list with any number of retries over the # limit for i in range(default_indexer.max_retries + 1, 10): - test_filelist = [PathDetails(original_path="/test/", retries=i)] + test_filelist = [PathDetails(original_path="/test/", retries=Retries(count=i))] default_indexer.index(test_filelist, 'test', default_rmq_message_dict) assert len(default_indexer.completelist) == 0