-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2714 batch upsert #2739
2714 batch upsert #2739
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, other than comment on adding a salt
case.caseReference is not None | ||
and case.caseReference.sourceEntryId is not None | ||
): | ||
case.caseReference.sourceEntryId = sha256( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a salt here? Otherwise someone can sha256 all upstream source Ids and correlate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK thinking about this it adds quite a bit of complexity that I'd like to discuss before committing to an implementation. It absolutely is the right thing to do, and I need to make sure I do it well :).
Salt description
Each time we create a case, we generate some random salt, compute the hash H(salt+sourceEntryId)
, then store salt+H
as the sourceEntryId in the document. (Actually something like "G.hv1:"+salt+H
so that we can easily identify whether an ID has been hashed, and can change the algorithm later when SHA256 is broken or too cheap.)
Problem 1: UI
The curator retrieves a case from the line list, edits it, then saves back. The service should not re-hash the source entry ID, because then it will not be seen as the same case.
Solution 1: _id
I think if a case already has an _id
field we don't modify the sourceEntryId
, on the basis that the document already came from the database so has been hashed already. If a case doesn't have an _id
field, then its information is from an external source and should be anonymised.
This adds the constraint that even non-Mongo data stores need to use the _id
field to store unique identifiers for cases, but that's an easy constraint to document and work with.
Problem 2: batch upsert
A couple of different use cases rely on being able to upload a whole batch of cases, inserting ones that haven't been seen before and updating ones that have: bulk upload through the portal, and data ingestion. In that case, we need to ensure that no existing cases are duplicated.
Solution 1: compare sourceEntryId
For each case in the input data set, retrieve the salt for the sourceEntryId
in each document with the same sourceId
and compute the hash for the incoming case using that salt. If they match, we have the same case, and should update rather than insert.
The problem with this approach is that it's O(n^2) in number of cases for a given sourceId (for every new case, compare against every existing case). The benefit is it implements both the existing upsert behaviour and the source entry ID anonymisation.
Solution 2: always insert
Cases are always treated as if they will be added to the data set. We add a dropExisting
parameter to the batch upsert endpoint, which actually becomes the batch insert endpoint. If dropExisting
is set, then the data service does a version of what we currently have with the list
flag: it marks all existing cases for that source as to be deleted, inserts the new cases, then deletes the old ones. If it encounters a problem then it rolls back.
The ADI scripts for COVID-19 currently do batched uploads of 250 cases at once, to limit memory size and request size. As such, we couldn't literally have a parameter on a batchUpsert
request: the first 250 cases would remove everything that's already stored. So we would need to have something stateful (presumably using the database) that mimics the current ADI behaviour: perhaps a "begin replacement upload" and "end replacement upload" pair (with some lifecycle management to deal with ingestion failing in the middle of the process).
The problems with this approach are that it changes the API contract of the data service—there is no more real batch upsert—and that it requires complex state management. The benefits are that it internalises behaviour we already need for ADI, and avoids the expensive case-by-case hash comparisons.
@abhidg @jim-sheldon is there another solution here I'm missing? Preferences for how to deal with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's another way -- always insert into parallel collections, and use another collection to switch out sets of cases instantaneously as in #2553.
Any way to avoid prune-like behaviour is desirable -- at least in MongoDB updating a field on millions of cases as a routine op is not going to scale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK so let me write that design up again to make sure that I've got the implications right. Cases are stored across multiple collections, with one collection per source. Presumably named after the sourceId, but for legibility let's give one a name: Spanish cases are in 'ES'. Now when I want to do a bulk upload that will replace all cases:
startReplacement?source=ES
creates a new collectionES_new
(with all the required indexes) and the data service records the fact that if it sees anybatchInsert
s for ES, it uses theES_new
collection. It carries on using theES
collection for queries (e.g. list, download).- Repeatedly call
batchInsert
until all of the new cases are inserted. Hash thesourceEntryId
s without reference to any existing data, because we assume that all cases are new. endReplacement?source=ES
drops the existing ES collection, renamesES_new
toES
, with the effect that now the application sees all the new data.abortReplacement?source=ES
drops the new collection, as well as the stateful indicator that the data-service should insert into that collection: everything goes back to using the existing collection.- We definitely need every case to have a
caseReference
with asourceId
because otherwise we don't know where to store it.
Is that what you mean @abhidg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm thinking it will be handled by parsing_lib as it is now after finalize_upload() https://github.com/globaldothealth/list/blob/main/ingestion/functions/common/parsing_lib.py#L517
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also where do manually-curated cases, which don't have an uploadID, live? In the early stages of an outbreak (Covid-19, monkeypox for example) cases are curated manually and the idea is that this would use the portal. If there's a collection per sourceId/uploadId then these cases would disappear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm thinking it will be handled by parsing_lib as it is now after finalize_upload() https://github.com/globaldothealth/list/blob/main/ingestion/functions/common/parsing_lib.py#L517
I'm thinking that there should be some internally consistent behaviour for the data service to follow, for situations where there isn't automatic ingestion or where there is still partially manual curation. The day zero behaviour will be more like MPXV today than like COVID-19 today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And in fact the bulk upload was used in COVID-19 to upload the spreadsheet data, which is a single "source" but contains data from multiple countries. So I think having one collection per upload would make for some very awkward queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the call @abhidg, we now have a resolution I will document here and then implement.
- All cases must have a source ID, and there will be no source entry ID (therefore no hashing, and also no correlation with upstream data).
- Upsert looks for the existence of
_id
(currently the mongo object ID, but generically "an opaque identity token assigned by Global.health") on a case. If_id
is set, then the case is a replacement; if not set, it is an insertion. - The curator service and ingestion library take care of building an atomic-looking data management platform on top of 1 and 2.
data-serving/reusable-data-service/reusable_data_service/stores/mongo_store.py
Outdated
Show resolved
Hide resolved
Ensure source entry is always anonymised
849729e
to
4d2dd96
Compare
Also additional error checking in batch upsert that catches exception raised in test failure
This obviates the problem with trying to keep hashed source entry IDs stable. See discussion in #2739
Hi @abhidg I have implemented the changes discussed yesterday: upsert checks whether a case has an |
This obviates the problem with trying to keep hashed source entry IDs stable. See discussion in #2739
This adds the batchUpsert endpoint, which is how bulk uploads and automatic ingestion are supported. As previously discussed with @abhidg this doesn't include curator or revision metadata, because they're unused in the Covid-19 version.