-
Notifications
You must be signed in to change notification settings - Fork 275
feat: add recoverer for failed tasks
#3821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
gcp/workers/recoverer/recoverer.py
Outdated
|
|
||
| def handle_gcs_retry(message: pubsub_v1.types.PubsubMessage) -> bool: | ||
| """Handle a failed GCS write.""" | ||
| # Check that the record hasn't been written/updated in the meantime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the comment down to the line where the check actually takes place
| if not vuln_id: | ||
| logging.error('gcs_missing: message missing id attribute: %s', message) | ||
| return True | ||
| # Re-put the Bug to regenerate the GCS & Datastore entities |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check if it has been added to GCS before reputting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, it's ok if we don't, since the re-put will completely regenerate the bug with fresh data.
| alias_group = osv.AliasGroup.query( | ||
| osv.AliasGroup.bug_ids == vuln_id).get() | ||
| if alias_group is None: | ||
| aliases = [] | ||
| aliases_modified = datetime.datetime.now(datetime.UTC) | ||
| else: | ||
| aliases = sorted(set(alias_group.bug_ids) - {vuln_id}) | ||
| aliases_modified = alias_group.last_modified | ||
| # Only update the modified time if it's actually being modified | ||
| if vuln_proto.aliases != aliases: | ||
| vuln_proto.aliases[:] = aliases | ||
| if aliases_modified > modified: | ||
| modified = aliases_modified | ||
| else: | ||
| modified = datetime.datetime.now(datetime.UTC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably fine to leave here for now, but is this code the same as the code in the alias group service? We probably should extract it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - it's pretty similar to alias/upstream.
It's hard to share code between these two at the moment due to how the dockerfiles are built.
Adds a cron job to validate the consistency of records between GCS and datastore with the new database format (#3850). This sends messages to the recoverer (#3821) to attempt to repair. The record-checker stores information from its previous runs in datastore in a new JobData entity - where the datastore Key is the name of the metadata, and the value is stored in a `value` property. I've written this in Go to because it's fairly simple and to set a precedent for writing / migrating other components away from python. I made a copy of the `logging` submodule from `vulnfeeds` into the new toplevel `go` directory - we'll want to consolidate these soon. There's also room for refactoring the record checker code have some reusable components for e.g. setting up datastore, gcs, or pub/sub. I added a validation test script to make sure datastore entities written in Python are compatible with the go definitions, and vice versa.
Added a new
recoverer(open to renaming) GKE worker in charge of recovering from database writing errors. It subscribes to the existingfailed-taskstopic (which is the dead-letter topic for the importer/workertaskstopic - I'm pretty sure those were being discarded since it had no subscriptions).failed-tasks. The recoverer will attempt to rectify these errors.get_google_cloud_project()function that multiple things end up using.Honestly, I have no idea how much of this code is actually going to run in practice - we haven't really seen any of these GCS errors on staging as of yet. Maybe this will never end up running 🙃
In another PR, I'll make a cron job that goes through all the recently-updated
Vulnerabilityentities to make sure they have GCS records associated with them (sending problem entities to therecovererto fix)