-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
boltdb shipper download failure handling and some refactorings #2156
boltdb shipper download failure handling and some refactorings #2156
Conversation
598007e
to
b92a54a
Compare
Codecov Report
@@ Coverage Diff @@
## master #2156 +/- ##
==========================================
- Coverage 62.14% 62.13% -0.02%
==========================================
Files 154 155 +1
Lines 12457 12509 +52
==========================================
+ Hits 7741 7772 +31
- Misses 4108 4131 +23
+ Partials 608 606 -2
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a lot of questions about this PR. Some of them are due to my unfamiliarity with logic in this package, but others are concerns for some of the design choices taken. Here's a few high level things that help make code more intuitive, safe, and extensible in my opinion:
- In general struct methods should be safe for use. If you're wrapping a state machine, the method shouldn't panic, etc, regardless of which state is held internally. You can return errors when needed, etc.
Example:
func (fc *filesCollection) cleanupFile(fileName string) error {
boltdbFile := fc.files[fileName].boltdb
filePath := boltdbFile.Path()
if err := boltdbFile.Close(); err != nil {
return err
}
delete(fc.files, fileName)
return os.Remove(filePath)
}
This is dangerous because it depends on the file existing in the map and being non-nil. Furthermore it also depends on the embedded *bbolt.DB
from being non-nil. If any of these are not true, it panics.
- Locks should be as unobtrusive as possible and ideally be colocated with the data structures they protect. They should avoid forcing consumers of the library from maintaining the library structure's state. For instance, the lock embedded in the
*filesCollection
should be used by it's own methods and not expect consumers of the collection to manage it's state. Instead of
fc.Lock()
fc.iter(...)
fc.Unlock()
, the fc.iter
method shouldl take care of the locking/unlocking logic.
- Struct methods shouldn't require you to know the exact implementation internally. Instead, we should be able to read the exported functions/method names and start using it. This is important in order to maintain an extensible codebase. We want to make sure that adding functionality in one pkg shouldn't require that you read every dependency that pkg has (and every dependency those dependencies have, recursively) in order to work on it. We want to make adding to the codebase an O(1) or O(n log n) operation, not O(n) or O(n^2).
return nil, fc.err | ||
} | ||
|
||
return fc.files[fileName], nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should return an error if the file does not exist -- (nil, nil) seems like an antipattern here and the consumer would need to check both file
& error
on every call.
fc.lastUsedAt = time.Now() | ||
} | ||
|
||
func (fc *filesCollection) lastUsedTime() time.Time { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little weird to have private getters that reference private fields, but it doesn't affect functionality. Since private methods are pkg-scoped anyways, these functions could all be replaced via fc.lastUsedAt
.
pkg/storage/stores/local/shipper.go
Outdated
@@ -260,20 +249,14 @@ func (s *Shipper) syncLocalWithStorage(ctx context.Context) (err error) { | |||
|
|||
// deleteFileFromCache removes a file from cache. | |||
// It takes care of locking the filesCollection, closing the boltdb file and removing the file from cache | |||
func (s *Shipper) deleteFileFromCache(period, uploader string, fc *filesCollection) error { | |||
func (s *Shipper) deleteFileFromCache(uploader string, fc *filesCollection) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like an antipattern to have this method just call lock/unlock then defer to the unexported cleanupFile
method. Why not instead have the *filesCollection
manage it's own mutex?
} | ||
|
||
func (s *Shipper) forEach(ctx context.Context, period string, callback func(db *bbolt.DB) error) error { | ||
func (s *Shipper) forEach(period string, callback func(db *bbolt.DB) error) error { | ||
s.downloadedPeriodsMtx.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain what's happening here? It tries to see if the period exists twice in a row? Why do we check twice hoping it's changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works in following manner:
- Check with read-only lock whether we can find the entry.
- If not with write lock check whether it got created by some other goroutine due to a race because multiple goroutines might be trying to acquire the write lock and create it.
- If it is still not there create it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works in following manner:
Check with read-only lock whether we can find the entry.
If not with write lock check whether it got created by some other goroutine due to a race because multiple goroutines might be trying to acquire the write lock and create it.
If it is still not there create it.
Thanks. Would you mind adding this as a comment to the code here?
pkg/storage/stores/local/shipper.go
Outdated
defer s.downloadedPeriodsMtx.Unlock() | ||
|
||
// cleaning up fc since it is in invalid state anyways. | ||
delete(s.downloadedPeriods, period) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of deleting it here, we should only add it to the downloadedPeriods
after it successfully downloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done like this to hold other requests needing the same index which is not downloaded yet to wait on mutex to get unlocked. The first request which gets to hold a lock gets to download the files. Does it make sense?
toDownload = append(toDownload, object) | ||
} | ||
} | ||
|
||
for uploader := range fc.files { | ||
err = fc.iter(func(uploader string, df *downloadedFile) error { | ||
if _, isOK := listedUploaders[uploader]; !isOK { | ||
toDelete = append(toDelete, uploader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means we'd delete when if uploader == s.uploader
, is this correct?
b624e5f
to
a35c4cc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all these changes. I have a few nits, but I think it's getting a lot closer. Nice work!
|
||
toDownload, toDelete, err := fc.checkStorageForUpdates(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, storageObject := range toDownload { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be some bounded concurrency mechanism for downloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in my todo and I think it deserves a separate PR. What do you think?
for _, uploader := range toDelete { | ||
err := s.deleteFileFromCache(period, uploader, fc) | ||
err := fc.cleanupFile(uploader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be better expressed as fc.CleanupFiles(toDelete...)
, which could then be refactored like
func (fc *FilesCollection) CleanupAllFiles(files ...string)
This cleanup function already takes the lock in the exact same way, so we'd be able to increase code reuse and be more idiomatic.
return nil | ||
} | ||
|
||
func (fc *FilesCollection) CleanupAllFiles() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As expressed earlier, I think this can be changed to
func (fc *FilesCollection) CleanupFiles(files ...string)
@@ -212,18 +209,18 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc | |||
|
|||
totalFilesSize += stat.Size() | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the lock can be held here rather than implicitly require the caller to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am taking the lock from the caller because this function is called in a goroutine. The goroutine could be waiting to be scheduled and some other goroutine could take the lock. While I am checking the ready
channel in reads but it would be error-prone to rely only on that because someone else might forget to check the channel. The contract is simple now, all the public methods take care of locking.
|
||
// cleaning up files due to error to avoid returning invalid results. | ||
for fileName := range fc.files { | ||
if err := fc.cleanupFile(fileName); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Locks aren't obtained for cleanup (they technically are when this is called via NewFilesCollection
, but I think it's better design to use the CleanupFIles(filename)
method I proposed earlier).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as mentioned earlier, all the public methods take care of locking. I did it this way to do all the operations of initializing the filesCollection
or cleaning up due to failures without releasing the locks. This way it is straightforward and less error-prone.
} | ||
|
||
func (s *Shipper) forEach(ctx context.Context, period string, callback func(db *bbolt.DB) error) error { | ||
func (s *Shipper) forEach(period string, callback func(db *bbolt.DB) error) error { | ||
s.downloadedPeriodsMtx.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works in following manner:
Check with read-only lock whether we can find the entry.
If not with write lock check whether it got created by some other goroutine due to a race because multiple goroutines might be trying to acquire the write lock and create it.
If it is still not there create it.
Thanks. Would you mind adding this as a comment to the code here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sandeep and I had a discussion around this PR, there are numerous other changes coming into this same space soon to chnage how index files are uploaded (no longer will we re-upload a file, instead only new files).
We agreed to merge this PR as is and address some of the remaining concerns with that work:
- Add more testing, and add it up front when the development is done
- Separate the upload/download functionality from the index type (shipper/receiver shouldn't care if it's boltdb or some other index)
- Re-evaluate the locks in use to make sure API calls on a struct are side-effect free.
Thanks Cyril/Owen/Sandeep for the discussions and support around this PR!
What this PR does / why we need it:
Previously boltdb shipper was using context from request for initial downloading of files which means if a request times out download operation would be cancelled as well and we will continue using whatever files were downloading until the context got cancelled. This would result in incorrect results until files get resynced. This PR takes care of it by passing the background context instead. This means files would continue to download even though request got timed out. We would anyways need those files for serving next requests.
This PR also does some code refactoring to make things clearer at some of the places and support the changes which handle the download failure issue.