Skip to content

Commit

Permalink
Merge pull request #45 from cedadev/development
Browse files Browse the repository at this point in the history
Milestone 4 Development: User Interactivity
  • Loading branch information
nmassey001 authored Feb 2, 2023
2 parents 3a04698 + fcfbdd5 commit 318ad04
Show file tree
Hide file tree
Showing 66 changed files with 4,520 additions and 1,250 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ dist/
**/.server_config
**/.processor_config
logs/
nlds_catalog.db
nlds_catalog.db
nlds_monitor.db
167 changes: 143 additions & 24 deletions docs/spec/nlds_specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,17 @@ containing the data required to carry out the processes:
access_key : <string>,
secret_key : <string>
},
meta {
},
data {
}
}

All messages retain all parts of the `details` field in the JSON message. This
allows the details of the transaction to be passed from process to process, even
when the process does not require some of the sub-fields in the `details` field.
All messages retain all parts of the `details` and `meta` fields in the JSON
message. This allows the details of the transaction to be passed from process to
process, even when the process does not require some of the sub-fields in the
`details` or `meta` fields. The `data` field can, and will, change between each
process.

The routing keys for the RabbitMQ messages have three components: the calling
application, the worker to act upon and the state or command for the worker.
Expand Down Expand Up @@ -495,13 +499,17 @@ URL.
transaction_id : <string>,
user : <string>,
group : <string>,
target : <string> (optional),
tenancy : <string> (optional),
access_key : <string>,
secret_key : <string>
},
data : {
filelist : <list<(json,int)>>
filelist : <list<(json,int)>>,
},
meta : {
label : <string> (optional),
holding_id : <int> (optional),
tag : <dict> (optional)
}
}

Expand Down Expand Up @@ -1006,32 +1014,143 @@ WARNING, ERROR, CRITICAL**
`code_line` and `module_name` can be derived from the exception using the
`traceback` module.

# User interaction via the NLDS client / client API

User actions:

## Transfer actions
### PUT a single / list of files
- `put`
- `putlist`
- Required arguments
- `--user`
- `--group`
- `filepath|filelist`
- Optional
- `--label=`: if a holding with label exists then add to an existing holding with the label, otherwise create a new holding with the label
- `--holding_id=`: adds to an existing holding with the (integer) id
- `--tag=key:value`: adds a tag to the holding on PUT

### GET a single / list of files
- `get`
- `getlist`
- Required arguments
- `--user=`
- `--group=`
- `--target=`
- `filepath|filelist`
- Optional
- `--label=`: get the file from a holding with matching label
- `--holding_id=`: get the file from a holding with the (integer) id
- `--tag=key:value`: get the file from a holding with the matching tag

### DELETE a single / list of files
- `del`
- `dellist`
- Required arguments
- `--user=`
- `--group=`
- `filepath|filelist`

## Query actions

### List the holdings for a user / group
- `list`
- Required arguments
- `--user=`
- Optional arguments
- `--group=`
- `--holding_id=` (integer)
- `--tag=key:value` (filter by tag)
- `--label=` (filter by label)
- `--time=datetime|(start datetime, end datetime)` (time the files were ingested)

### List the files for a user / group
- `find`
- Required arguments
- `--user=`
- Optional arguments
- `--group=`
- `--holding_id=` (integer)
- `--tag=key:value` (filter by tag for the holding containing the files)
- `--label=` (filter by the holding label)
- `--time=datetime|(start datetime, end datetime)` (time the files were ingested)
- `--path=` (filter by original path, can be a substring, regex or wildcard)

### Update the holding metadata
- `meta`
- Required arguments
- `--user=`
- One of (must guarantee uniqueness)
- `--holding_id=`
- `--label=`
- Optional
- `--group`
- `--update_tag=key:value` (create or amend a tag)
- `--update_label=` (change the label)

# Development notes

**Transfer**
Need separate get and put consumers
- Probably good to have a base-transfer processor that does the core work
(verification of message, creation of minio client etc.) and then split the
transfer work into two child classes.

Check file permissions in the transfer processor too, but add ability to
configure both the transfer and indexer to not do this - to reduce iteration
time.
- Would be worth benchmarking this at some point!
- Should we be checking filelist length here and reindexing / resizing the list
if too long?

Currently designating the buckets with the transaction ID, probably a better way
of doing this - feeds into catalogue design.

**Indexer**
## Indexer
Sym-links and directories need to be preserved in the backup, this is still
being worked out but will need implementing! Symlinks with common path - i.e.
that point to a file/directory within the scope of the original batch - need to
be converted to be relative so that restoring the files in a different directory
structure works. Similarly, symlinks to locations not on the common path need to
be preserved as absolute links.

Test the permissions properly, with a sudoers file specifying a user which can
change to other users (a super user).
## Monitoring
Database for monitoring is quite simple (I think), we only require:
- transaction_id
- holding_id (when one exists)
- tag (if one exists)
- state
- user
- message_splits

we can put this in a table called transaction_states

#### What states do we want?
We probably want, at least:
1. Routing (sent from API-server?)
2. Splitting
3. Indexing
4. Transferring
5. Cataloging
6. Complete
7. Failed (just thought of this!)

#### What to do about message splitting
Given that messages can be split into batches and by size in the indexer, we could have a single transaction be split across N jobs where N >> 1. This is a worst case obviously, but it is absolutely necessary to be prepared for it.

We can keep track of how many splits have occurred in the indexer, both at the 'split' part and at the 'index' part. We could have a `message_split_count` as part of the database holding. We could in fact have a second table of transaction_splits with a 1-to-many relationship and have each of these have a separate state, but this feels a little onorous - we're trying to have as little state as possible and this would require each individual message to keep track of how many times it has split. Not to mention the splits table could get VERY big VERY quickly - we would need to purge it basically 2 weeks after we're sure that a transaction has completed,

What we could do instead is have some table which is states, and that can keep track of how many times each state has been pinged for each transaction. For example, take a message that has been split 6 times, we would then have a state_count for each of the states that is incremented when a new state-update message comes in. Then we know a transaction has completely finished when a state_count == the split count. This is potentially problematic though as it could lead to a race condition - if two messages arrive at a very similar time and try to increment the counter at the same time then we could lose a count and the job would forever be stuck in 'not enough counts for X state'.

Another alternative is to have the state be updated as soon as a _new_ state comes in. This gets around race conditions as it makes the overall database value matter less. Two messages arriving with conflicting states could race but the state is essentially ratcheting up - the later state will come out on top eventually. In fact all jobs will reach a COMPLETE state eventually as the final job going into the COMPLETE state will not have anything else to compete against. However, the state could therefore be unrepresentative of the actual state of the system.

We could get around race conditions entirely by requiring that there only be one monitoring queue with a prefetch of 1. Therefore there will only ever be one read to the database at a time and it will definitely be serial. This might be a bottleneck though, especially if we're reading and writing to the database multiple times for each transaction at each step of the process. Say we have 1000 transactions, each split into a 1000 subjobs, each writing to the database at each stage of the put process. There would be $5\times1000\times1000 = 5,000,000$ individual database writes just for that clump of jobs.

I think, on balance, this volume of writes probably isn't a problem (can discuss with Neil obviously) so I'm going to press ahead with a design which has both a ratcheting job state and individual transaction_state_counts, with a queue-prefetch of 1. Therefore we want a single table with the following:
1. id [INT]
2. transaction_id [UUID | STRING] (unique)
3. user [STRING] (uid?)
4. group [STRING] (gid?)
5. (ratcheted) state [ENUM | INT]
6. split_count [INT] (set at split & index step)
9. indexing_count [INT]
10. transferring_count [INT]
11. cataloging_count [INT]
12. complete_count [INT]
13. failed_count [INT]

with the transaction_id either given, or gotten from a tag/holding_id from the catalog if requesting that way. Note that requesting by tag will therefore not work if the job has not reached the cataloging step yet, although a job will not have

An open question remains about timestamps - do we need one? The ingest time will be saved for any given catalog record, but reording the timestamp of most recent change for the furthest state might be useful? The whole idea of this is to be as slim as possible though, so it might be best to just leave it out given it doesn't provide that much benefit.

### On database visibility for monitoring requests
Neil made the good point that having the api-server talk directly to the databases might be bad for the database if traffic levels get too high, i.e. the server or the database would be in danger of clogging up the whole system. A better approach would be to have the API-server be a consumer with its own queue, and use an asynchronous `await` command from within a given 'router' function block (i.e. one of the router endpoints ) to ensure that the information can be sent back to the user with the API http response. We need to do some figuring-out to work out if this is possible.

**Initial thoughts:**
- The awaiting thread will need to be picked back up by the consumer callback on the API-server - I am not sure how this works!
- Could call an async `send_and_receive` function within the router function which creates a consumer, sends a message to the exchange and then waits for a response (for some timeout value...)
Binary file modified docs/spec/uml/catalog_db.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 19 additions & 10 deletions docs/spec/uml/catalog_db.puml
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
@startuml catalog_db

object "**Holding**" as holding {
id [INT]
transaction_id [UUID_64|STRING]
id [INT](unique)
label [STRING](unique with user)
user [STRING]
group [STRING]
}

object "**Transaction**" as transaction {
id [INT](unique)
transaction_id [UUID_64|STRING](unique)
ingest_time [DATETIME]
}

object "**Tag**" as tag {
id [INT]
tag [STRING]
id [INT](unique)
key [STRING]
value [STRING]
}

object "**File**" as file {
id [INT]
id [INT](unique)
original_path [STRING]
path_type [STRING]
link_path [STRING]
Expand All @@ -23,21 +31,22 @@ object "**File**" as file {
}

object "**Location**" as location {
id [INT]
id [INT](unique)
storage_type [OBJECT_STORAGE|TAPE]
root [STRING]
path [STRING]
access_time [DATETIME]
}

object "**Checksum**" as checksum {
id [INT]
checksum [STRING]
id [INT](unique)
checksum [STRING](unique with algorithm)
algorithm [STRING]
}

holding "1" *-- "many" file
transaction "1" *-- "many" file
holding "1" *-- "many" transaction
holding "1" *-- "many" tag
file "1" *-- "many" location
file "1" *-- "1" checksum
file "1" *-- "many" checksum
@enduml
Binary file added docs/spec/uml/deployment.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
58 changes: 58 additions & 0 deletions docs/spec/uml/deployment_2022-12-07.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
@startuml deployment

actor user as "User"
package sci_machines as "sci Machines"{
agent client as "Client API / CLI" #MediumPurple
}

component posix as "POSIX disk"
cloud object_storage as "Object Storage"
component tape as "Tape"

package kubernetes as "Wigbiorg (Kubernetes)" {
package consumers as "Basic deployment" {
agent logger as "Logger" #hotpink
agent indexer as "Indexer" #LightBlue
agent transfer_put as "Put Transfer" #gold
agent nlds_worker as "NLDS Worker" #lightgrey
agent monitoring as "Monitor" #tomato
agent cataloguer as "Cataloguer" #springgreen
}
package nginx as "Load balanced deployment"{
agent "API Server" as api_server #DodgerBlue
}
}
package vms as "VMs"{
package cloud_vm_consumer as "Cloud VM /w write access"{
agent transfer_get as "Get Transfer" #gold
}

package cloud_vm_rabbit as "Cloud VM"{
agent rabbit_server as "Rabbit server" #DarkOrange
}
}
package db_server as "DB Server"{
database catalogue_db as "Catalogue DB" #springgreen
database monitoring_db as "Monitoring DB" #tomato
}

user-->client
client<-->api_server
' api_server-->rabbit_server
' message_broker <--> micro_service_1
' message_broker <--> micro_service_2
' message_broker <--> micro_service_3
' micro_service_3 --> monitoring
' micro_service_1 --> catalogue
' monitoring <--> api_server
' cataloguer <--> api_server
catalogue_db <--> cataloguer
monitoring_db <--> monitoring
' micro_service_3 <--> transfer
transfer_put --> object_storage
transfer_put <-- posix
transfer_get <-- object_storage
transfer_get --> posix
' transfer <--> tape

@enduml
12 changes: 6 additions & 6 deletions docs/spec/uml/message_flow_get1.puml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ participant wex as "Exchange"

queue qw as "NLDS Q" #lightgrey
note over qw
topic = nlds-api.nlds.#
topic = nlds-api.nlds.*
end note
collections work as "NLDS\nWorker" #lightgrey

queue qc as "Catalog Q" #springgreen
collections catalog_get as "Cataloguers" #springgreen
database catalog_db as "Catalog DB" #springgreen
note over qc
topic = #.catalog-get.#
topic = *.catalog-get.*
end note

user -> client : GET(filelist,target,\n\tuser,group,id)
Expand All @@ -41,14 +41,14 @@ work -> wex : key=(nlds-api).catalog-get.start
deactivate work

activate wex
wex -> qc : key=(#).catalog-get.start
wex -> qc : key=(*).catalog-get.start
deactivate wex

activate qc
qc -> catalog_get : key=(#).catalog-get.start
qc -> catalog_get : key=(*).catalog-get.start
deactivate qc
note right of qc
(#) here will match the calling
(*) here will match the calling
application.
`nlds-api` in this case.
end note
Expand All @@ -59,7 +59,7 @@ loop #lightgrey for file in filelist
catalog_db -> catalog_get : TRUE || FALSE
deactivate catalog_db
end
catalog_get -> wex : key=(#).catalog-get.complete
catalog_get -> wex : key=(*).catalog-get.complete
deactivate catalog_get

@enduml
Loading

0 comments on commit 318ad04

Please sign in to comment.