diff --git a/go.mod b/go.mod index af245320ec..d83dd73ce5 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( codeberg.org/gruf/go-errors v1.0.5 + codeberg.org/gruf/go-mutexes v1.1.2 codeberg.org/gruf/go-runners v1.2.0 codeberg.org/gruf/go-store v1.3.6 github.com/ReneKroon/ttlcache v1.7.0 @@ -31,7 +32,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.10.0 github.com/stretchr/testify v1.7.0 - github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a + github.com/superseriousbusiness/activity v1.1.0-gts github.com/superseriousbusiness/exif-terminator v0.2.0 github.com/superseriousbusiness/oauth2/v4 v4.3.2-SSB github.com/tdewolff/minify/v2 v2.9.22 @@ -54,7 +55,6 @@ require ( codeberg.org/gruf/go-fastpath v1.0.2 // indirect codeberg.org/gruf/go-format v1.0.3 // indirect codeberg.org/gruf/go-hashenc v1.0.1 // indirect - codeberg.org/gruf/go-mutexes v1.1.2 // indirect codeberg.org/gruf/go-pools v1.0.2 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 88fed3a0d6..2eb96ff487 100644 --- a/go.sum +++ b/go.sum @@ -535,8 +535,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= -github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a h1:tKr18ijUgZ+PCM/n+St6uO2BaPgkReRtM3IJHC/Otf4= -github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a/go.mod h1:AZw0Xb4Oju8rmaJCZ21gc5CPg47MmNgyac+Hx5jo8VM= +github.com/superseriousbusiness/activity v1.1.0-gts h1:BSnMzs/84s0Zme7BngE9iJAHV7g1Bv1nhLCP0aJtU3I= +github.com/superseriousbusiness/activity v1.1.0-gts/go.mod h1:AZw0Xb4Oju8rmaJCZ21gc5CPg47MmNgyac+Hx5jo8VM= github.com/superseriousbusiness/exif-terminator v0.2.0 h1:C21KOUr54E37qTqYS7WJX0J83sNzzCwBEy0KXyDprqU= github.com/superseriousbusiness/exif-terminator v0.2.0/go.mod h1:DHJuKguXqyOVqB/oyOylutEDIZCbkYsn2GZFNSUDT9E= github.com/superseriousbusiness/go-jpeg-image-structure/v2 v2.0.0-20220321154430-d89a106fdabe h1:ksl2oCx/Qo8sNDc3Grb8WGKBM9nkvhCm25uvlT86azE= diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index 5de7fa607f..36df2593f0 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -20,9 +20,8 @@ package federatingdb import ( "context" - "sync" - "time" + "codeberg.org/gruf/go-mutexes" "github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/db" @@ -41,9 +40,7 @@ type DB interface { // FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface. // It doesn't care what the underlying implementation of the DB interface is, as long as it works. type federatingDB struct { - mutex sync.Mutex - locks map[string]*mutex - pool sync.Pool + locks mutexes.MutexMap db db.DB typeConverter typeutils.TypeConverter } @@ -51,29 +48,9 @@ type federatingDB struct { // New returns a DB interface using the given database and config func New(db db.DB) DB { fdb := federatingDB{ - mutex: sync.Mutex{}, - locks: make(map[string]*mutex, 100), - pool: sync.Pool{New: func() interface{} { return &mutex{} }}, + locks: mutexes.NewMap(-1, -1), // use defaults db: db, typeConverter: typeutils.NewConverter(db), } - go fdb.cleanupLocks() return &fdb } - -func (db *federatingDB) cleanupLocks() { - for { - // Sleep for a minute... - time.Sleep(time.Minute) - - // Delete unused locks from map - db.mutex.Lock() - for id, mu := range db.locks { - if !mu.inUse() { - delete(db.locks, id) - db.pool.Put(mu) - } - } - db.mutex.Unlock() - } -} diff --git a/internal/federation/federatingdb/lock.go b/internal/federation/federatingdb/lock.go index 22f2bb77aa..e3da99dd6b 100644 --- a/internal/federation/federatingdb/lock.go +++ b/internal/federation/federatingdb/lock.go @@ -22,10 +22,6 @@ import ( "context" "errors" "net/url" - "sync" - "sync/atomic" - - "github.com/sirupsen/logrus" ) // Lock takes a lock for the object at the specified id. If an error @@ -39,83 +35,10 @@ import ( // processes require tight loops acquiring and releasing locks. // // Used to ensure race conditions in multiple requests do not occur. -func (f *federatingDB) Lock(c context.Context, id *url.URL) error { - // Before any other Database methods are called, the relevant `id` - // entries are locked to allow for fine-grained concurrency. - - // Strategy: create a new lock, if stored, continue. Otherwise, lock the - // existing mutex. - if id == nil { - return errors.New("Lock: id was nil") - } - idStr := id.String() - - // Acquire map lock - f.mutex.Lock() - - // Get mutex, or create new - mu, ok := f.locks[idStr] - if !ok { - mu, ok = f.pool.Get().(*mutex) - if !ok { - logrus.Panic("Lock: pool entry was not a *mutex") - } - f.locks[idStr] = mu - } - - // Unlock map, acquire mutex lock - f.mutex.Unlock() - mu.Lock() - return nil -} - -// Unlock makes the lock for the object at the specified id available. -// If an error is returned, the lock must have still been freed. -// -// Used to ensure race conditions in multiple requests do not occur. -func (f *federatingDB) Unlock(c context.Context, id *url.URL) error { - // Once Go-Fed is done calling Database methods, the relevant `id` - // entries are unlocked. +func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) { if id == nil { - return errors.New("Unlock: id was nil") + return nil, errors.New("Lock: id was nil") } - idStr := id.String() - - // Check map for mutex - f.mutex.Lock() - mu, ok := f.locks[idStr] - f.mutex.Unlock() - - if !ok { - return errors.New("missing an id in unlock") - } - - // Unlock the mutex - mu.Unlock() - return nil -} - -// mutex defines a mutex we can check the lock status of. -// this is not perfect, but it's good enough for a semi -// regular mutex cleanup routine -type mutex struct { - mu sync.Mutex - st uint32 -} - -// inUse returns if the mutex is in use -func (mu *mutex) inUse() bool { - return atomic.LoadUint32(&mu.st) == 1 -} - -// Lock acquire mutex lock -func (mu *mutex) Lock() { - mu.mu.Lock() - atomic.StoreUint32(&mu.st, 1) -} - -// Unlock releases mutex lock -func (mu *mutex) Unlock() { - mu.mu.Unlock() - atomic.StoreUint32(&mu.st, 0) + unlock := f.locks.Lock(id.String()) + return unlock, nil } diff --git a/vendor/github.com/superseriousbusiness/activity/pub/database.go b/vendor/github.com/superseriousbusiness/activity/pub/database.go index 8d3bbd4654..5c58c60cec 100644 --- a/vendor/github.com/superseriousbusiness/activity/pub/database.go +++ b/vendor/github.com/superseriousbusiness/activity/pub/database.go @@ -19,12 +19,7 @@ type Database interface { // processes require tight loops acquiring and releasing locks. // // Used to ensure race conditions in multiple requests do not occur. - Lock(c context.Context, id *url.URL) error - // Unlock makes the lock for the object at the specified id available. - // If an error is returned, the lock must have still been freed. - // - // Used to ensure race conditions in multiple requests do not occur. - Unlock(c context.Context, id *url.URL) error + Lock(c context.Context, id *url.URL) (unlock func(), err error) // InboxContains returns true if the OrderedCollection at 'inbox' // contains the specified 'id'. // diff --git a/vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go b/vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go index 5115d95b1c..fffee8b81b 100644 --- a/vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go +++ b/vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go @@ -263,11 +263,12 @@ func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivitySt if err != nil { return err } - err = w.db.Lock(c, id) + var unlock func() + unlock, err = w.db.Lock(c, id) if err != nil { return err } - defer w.db.Unlock(c, id) + defer unlock() if err := w.db.Create(c, t); err != nil { return err } @@ -304,11 +305,12 @@ func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivitySt if err != nil { return err } - err = w.db.Lock(c, id) + var unlock func() + unlock, err = w.db.Lock(c, id) if err != nil { return err } - defer w.db.Unlock(c, id) + defer unlock() if err := w.db.Update(c, t); err != nil { return err } @@ -341,11 +343,12 @@ func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.Activity if err != nil { return err } - err = w.db.Lock(c, id) + var unlock func() + unlock, err = w.db.Lock(c, id) if err != nil { return err } - defer w.db.Unlock(c, id) + defer unlock() if err := w.db.Delete(c, id); err != nil { return err } @@ -373,16 +376,16 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt // // If not then don't send a response. It was federated to us as an FYI, // by mistake, or some other reason. - if err := w.db.Lock(c, w.inboxIRI); err != nil { + unlock, err := w.db.Lock(c, w.inboxIRI) + if err != nil { return err } // WARNING: Unlock not deferred. actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI) + unlock() // unlock even on error if err != nil { - w.db.Unlock(c, w.inboxIRI) return err } - w.db.Unlock(c, w.inboxIRI) // Unlock must be called by now and every branch above. isMe := false if w.OnFollow != OnFollowDoNothing { @@ -434,13 +437,14 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt // // If automatically rejecting, do not update the // followers collection. - if err := w.db.Lock(c, actorIRI); err != nil { + unlock, err := w.db.Lock(c, actorIRI) + if err != nil { return err } // WARNING: Unlock not deferred. followers, err := w.db.Followers(c, actorIRI) if err != nil { - w.db.Unlock(c, actorIRI) + unlock() return err } items := followers.GetActivityStreamsItems() @@ -451,21 +455,23 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt for _, elem := range recipients { items.PrependIRI(elem) } - if err = w.db.Update(c, followers); err != nil { - w.db.Unlock(c, actorIRI) + err = w.db.Update(c, followers) + unlock() // unlock even on error + if err != nil { return err } - w.db.Unlock(c, actorIRI) // Unlock must be called by now and every branch above. } // Lock without defer! - w.db.Lock(c, w.inboxIRI) + unlock, err := w.db.Lock(c, w.inboxIRI) + if err != nil { + return err + } outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI) + unlock() // unlock after, regardless if err != nil { - w.db.Unlock(c, w.inboxIRI) return err } - w.db.Unlock(c, w.inboxIRI) // Everything must be unlocked by now. if err := w.addNewIds(c, response); err != nil { return err @@ -484,16 +490,16 @@ func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivitySt op := a.GetActivityStreamsObject() if op != nil && op.Len() > 0 { // Get this actor's id. - if err := w.db.Lock(c, w.inboxIRI); err != nil { + unlock, err := w.db.Lock(c, w.inboxIRI) + if err != nil { return err } // WARNING: Unlock not deferred. actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI) + unlock() // unlock after regardless if err != nil { - w.db.Unlock(c, w.inboxIRI) return err } - w.db.Unlock(c, w.inboxIRI) // Unlock must be called by now and every branch above. // // Determine if we are in a follow on the 'object' property. @@ -568,10 +574,11 @@ func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivitySt // Use an anonymous function to properly scope the // database lock, immediately call it. err = func() error { - if err := w.db.Lock(c, maybeMyFollowIRI); err != nil { + unlock, err := w.db.Lock(c, maybeMyFollowIRI) + if err != nil { return err } - defer w.db.Unlock(c, maybeMyFollowIRI) + defer unlock() t, err := w.db.Get(c, maybeMyFollowIRI) if err != nil { return err @@ -630,13 +637,14 @@ func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivitySt return err } // Add the peer to our following collection. - if err := w.db.Lock(c, actorIRI); err != nil { + unlock, err := w.db.Lock(c, actorIRI) + if err != nil { return err } // WARNING: Unlock not deferred. following, err := w.db.Following(c, actorIRI) if err != nil { - w.db.Unlock(c, actorIRI) + unlock() return err } items := following.GetActivityStreamsItems() @@ -647,16 +655,16 @@ func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivitySt for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() { id, err := ToId(iter) if err != nil { - w.db.Unlock(c, actorIRI) + unlock() return err } items.PrependIRI(id) } - if err = w.db.Update(c, following); err != nil { - w.db.Unlock(c, actorIRI) + err = w.db.Update(c, following) + unlock() // unlock after regardless + if err != nil { return err } - w.db.Unlock(c, actorIRI) // Unlock must be called by now and every branch above. } } @@ -729,10 +737,11 @@ func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStre if err != nil { return err } - if err := w.db.Lock(c, objId); err != nil { + unlock, err := w.db.Lock(c, objId) + if err != nil { return err } - defer w.db.Unlock(c, objId) + defer unlock() if owns, err := w.db.Owns(c, objId); err != nil { return err } else if !owns { @@ -810,10 +819,11 @@ func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.Activity if err != nil { return err } - if err := w.db.Lock(c, objId); err != nil { + unlock, err := w.db.Lock(c, objId) + if err != nil { return err } - defer w.db.Unlock(c, objId) + defer unlock() if owns, err := w.db.Owns(c, objId); err != nil { return err } else if !owns { diff --git a/vendor/github.com/superseriousbusiness/activity/pub/handlers.go b/vendor/github.com/superseriousbusiness/activity/pub/handlers.go index bc7eeb9d81..609478e80b 100644 --- a/vendor/github.com/superseriousbusiness/activity/pub/handlers.go +++ b/vendor/github.com/superseriousbusiness/activity/pub/handlers.go @@ -64,18 +64,20 @@ func NewActivityStreamsHandlerScheme(db Database, clock Clock, scheme string) Ha } isASRequest = true id := requestId(r, scheme) + + var unlock func() + // Lock and obtain a copy of the requested ActivityStreams value - err = db.Lock(c, id) + unlock, err = db.Lock(c, id) if err != nil { return } // WARNING: Unlock not deferred t, err := db.Get(c, id) + unlock() // unlock even on error if err != nil { - db.Unlock(c, id) return } - db.Unlock(c, id) // Unlock must have been called by this point and in every // branch above if t == nil { diff --git a/vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go b/vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go index 73b6c8a74e..a430ec8d3d 100644 --- a/vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go +++ b/vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go @@ -148,7 +148,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, // Obtain the id of the activity id := activity.GetJSONLDId() // Acquire a lock for the id. To be held for the rest of execution. - err := a.db.Lock(c, id.Get()) + unlock, err := a.db.Lock(c, id.Get()) if err != nil { return err } @@ -157,19 +157,18 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, // If the database already contains the activity, exit early. exists, err := a.db.Exists(c, id.Get()) if err != nil { - a.db.Unlock(c, id.Get()) + unlock() return err } else if exists { - a.db.Unlock(c, id.Get()) + unlock() return nil } // Attempt to create the activity entry. err = a.db.Create(c, activity) + unlock() // unlock even on error return if err != nil { - a.db.Unlock(c, id.Get()) return err } - a.db.Unlock(c, id.Get()) // Unlock by this point and in every branch above. // // 2. The values of 'to', 'cc', or 'audience' are Collections owned by @@ -212,19 +211,19 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, if err != nil { return err } - err = a.db.Lock(c, iri) + var unlock func() + unlock, err = a.db.Lock(c, iri) if err != nil { return err } // WARNING: Unlock is not deferred - if owns, err := a.db.Owns(c, iri); err != nil { - a.db.Unlock(c, iri) + owns, err := a.db.Owns(c, iri) + unlock() // unlock even on error + if err != nil { return err } else if !owns { - a.db.Unlock(c, iri) continue } - a.db.Unlock(c, iri) // Unlock by this point and in every branch above. myIRIs = append(myIRIs, iri) } @@ -236,7 +235,8 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, col := make(map[string]itemser) oCol := make(map[string]orderedItemser) for _, iri := range myIRIs { - err = a.db.Lock(c, iri) + var unlock func() + unlock, err = a.db.Lock(c, iri) if err != nil { return err } @@ -249,20 +249,20 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, if im, ok := t.(orderedItemser); ok { oCol[iri.String()] = im colIRIs = append(colIRIs, iri) - defer a.db.Unlock(c, iri) + defer unlock() } else { - a.db.Unlock(c, iri) + unlock() // unlock instantly } } else if streams.IsOrExtendsActivityStreamsCollection(t) { if im, ok := t.(itemser); ok { col[iri.String()] = im colIRIs = append(colIRIs, iri) - defer a.db.Unlock(c, iri) + defer unlock() } else { - a.db.Unlock(c, iri) + unlock() // unlock instantly } } else { - a.db.Unlock(c, iri) + unlock() // unlock instantly } } // If we own none of the Collection IRIs in 'to', 'cc', or 'audience' @@ -409,17 +409,17 @@ func (a *sideEffectActor) Deliver(c context.Context, outboxIRI *url.URL, activit // WrapInCreate wraps an object with a Create activity. func (a *sideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) { - err = a.db.Lock(c, outboxIRI) + var unlock func() + unlock, err = a.db.Lock(c, outboxIRI) if err != nil { return } // WARNING: No deferring the Unlock actorIRI, err := a.db.ActorForOutbox(c, outboxIRI) + unlock() // unlock after regardless if err != nil { - a.db.Unlock(c, outboxIRI) return } - a.db.Unlock(c, outboxIRI) // Unlock the lock at this point and every branch above return wrapInCreate(c, obj, actorIRI) } @@ -447,26 +447,25 @@ func (a *sideEffectActor) deliverToRecipients(c context.Context, boxIRI *url.URL func (a *sideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, activity Activity) error { // Set the activity in the database first. id := activity.GetJSONLDId() - err := a.db.Lock(c, id.Get()) + unlock, err := a.db.Lock(c, id.Get()) if err != nil { return err } // WARNING: Unlock not deferred err = a.db.Create(c, activity) + unlock() // unlock after regardless if err != nil { - a.db.Unlock(c, id.Get()) return err } - a.db.Unlock(c, id.Get()) // WARNING: Unlock(c, id) should be called by this point and in every // return before here. // // Acquire a lock to read the outbox. Defer release. - err = a.db.Lock(c, outboxIRI) + unlock, err = a.db.Lock(c, outboxIRI) if err != nil { return err } - defer a.db.Unlock(c, outboxIRI) + defer unlock() outbox, err := a.db.GetOutbox(c, outboxIRI) if err != nil { return err @@ -491,11 +490,12 @@ func (a *sideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, act // Returns true when the activity is novel. func (a *sideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL, activity Activity) (isNew bool, err error) { // Acquire a lock to read the inbox. Defer release. - err = a.db.Lock(c, inboxIRI) + var unlock func() + unlock, err = a.db.Lock(c, inboxIRI) if err != nil { return } - defer a.db.Unlock(c, inboxIRI) + defer unlock() // Obtain the id of the activity id := activity.GetJSONLDId() // If the inbox already contains the URL, early exit. @@ -539,19 +539,18 @@ func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI * types, iris := getInboxForwardingValues(val) // For IRIs, simply check if we own them. for _, iri := range iris { - err := a.db.Lock(c, iri) + unlock, err := a.db.Lock(c, iri) if err != nil { return false, err } // WARNING: Unlock is not deferred - if owns, err := a.db.Owns(c, iri); err != nil { - a.db.Unlock(c, iri) + owns, err := a.db.Owns(c, iri) + unlock() // unlock after regardless + if err != nil { return false, err } else if owns { - a.db.Unlock(c, iri) return true, nil } - a.db.Unlock(c, iri) // Unlock by this point and in every branch above } // For embedded literals, check the id. @@ -560,19 +559,19 @@ func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI * if err != nil { return false, err } - err = a.db.Lock(c, id) + var unlock func() + unlock, err = a.db.Lock(c, id) if err != nil { return false, err } // WARNING: Unlock is not deferred - if owns, err := a.db.Owns(c, id); err != nil { - a.db.Unlock(c, id) + owns, err := a.db.Owns(c, id) + unlock() // unlock after regardless + if err != nil { return false, err } else if owns { - a.db.Unlock(c, id) return true, nil } - a.db.Unlock(c, id) // Unlock by this point and in every branch above } // Recur Preparation: Try fetching the IRIs so we can recur into them. @@ -683,7 +682,8 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit foundInboxesFromDB := []*url.URL{} for _, actorIRI := range r { // BEGIN LOCK - err = a.db.Lock(c, actorIRI) + var unlock func() + unlock, err = a.db.Lock(c, actorIRI) if err != nil { return } @@ -691,7 +691,7 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit inboxes, err := a.db.InboxesForIRI(c, actorIRI) if err != nil { // bail on error - a.db.Unlock(c, actorIRI) + unlock() return nil, err } @@ -699,16 +699,13 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit // we have a hit foundInboxesFromDB = append(foundInboxesFromDB, inboxes...) - // if we found inboxes for this iri, we should remove it from + // if we found inboxes for this iri, we should remove it from // the list of actors/iris we still need to dereference r = removeOne(r, actorIRI) } // END LOCK - a.db.Unlock(c, actorIRI) - if err != nil { - return nil, err - } + unlock() } // look for any actors' inboxes that weren't already discovered above; @@ -733,25 +730,25 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit targets = append(targets, foundInboxesFromRemote...) // Get inboxes of sender. - err = a.db.Lock(c, outboxIRI) + var unlock func() + unlock, err = a.db.Lock(c, outboxIRI) if err != nil { return } // WARNING: No deferring the Unlock actorIRI, err := a.db.ActorForOutbox(c, outboxIRI) + unlock() // unlock after regardless if err != nil { - a.db.Unlock(c, outboxIRI) return } - a.db.Unlock(c, outboxIRI) // Get the inbox on the sender. - err = a.db.Lock(c, actorIRI) + unlock, err = a.db.Lock(c, actorIRI) if err != nil { return nil, err } // BEGIN LOCK thisActor, err := a.db.Get(c, actorIRI) - a.db.Unlock(c, actorIRI) + unlock() // END LOCK -- Still need to handle err if err != nil { return nil, err diff --git a/vendor/github.com/superseriousbusiness/activity/pub/social_wrapped_callbacks.go b/vendor/github.com/superseriousbusiness/activity/pub/social_wrapped_callbacks.go index a652e337ff..868354af16 100644 --- a/vendor/github.com/superseriousbusiness/activity/pub/social_wrapped_callbacks.go +++ b/vendor/github.com/superseriousbusiness/activity/pub/social_wrapped_callbacks.go @@ -259,11 +259,12 @@ func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStream if err != nil { return err } - err = w.db.Lock(c, id) + var unlock func() + unlock, err = w.db.Lock(c, id) if err != nil { return err } - defer w.db.Unlock(c, id) + defer unlock() if err := w.db.Create(c, obj); err != nil { return err } @@ -301,11 +302,11 @@ func (w SocialWrappedCallbacks) update(c context.Context, a vocab.ActivityStream // Create anonymous loop function to be able to properly scope the defer // for the database lock at each iteration. loopFn := func(idx int, loopId *url.URL) error { - err := w.db.Lock(c, loopId) + unlock, err := w.db.Lock(c, loopId) if err != nil { return err } - defer w.db.Unlock(c, loopId) + defer unlock() t, err := w.db.Get(c, loopId) if err != nil { return err @@ -371,11 +372,11 @@ func (w SocialWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStre // Create anonymous loop function to be able to properly scope the defer // for the database lock at each iteration. loopFn := func(idx int, loopId *url.URL) error { - err := w.db.Lock(c, loopId) + unlock, err := w.db.Lock(c, loopId) if err != nil { return err } - defer w.db.Unlock(c, loopId) + defer unlock() t, err := w.db.Get(c, loopId) if err != nil { return err @@ -458,23 +459,24 @@ func (w SocialWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsL return ErrObjectRequired } // Get this actor's IRI. - if err := w.db.Lock(c, w.outboxIRI); err != nil { + unlock, err := w.db.Lock(c, w.outboxIRI) + if err != nil { return err } // WARNING: Unlock not deferred. actorIRI, err := w.db.ActorForOutbox(c, w.outboxIRI) + unlock() // unlock even on error if err != nil { - w.db.Unlock(c, w.outboxIRI) return err } - w.db.Unlock(c, w.outboxIRI) // Unlock must be called by now and every branch above. // // Now obtain this actor's 'liked' collection. - if err := w.db.Lock(c, actorIRI); err != nil { + unlock, err = w.db.Lock(c, actorIRI) + if err != nil { return err } - defer w.db.Unlock(c, actorIRI) + defer unlock() liked, err := w.db.Liked(c, actorIRI) if err != nil { return err diff --git a/vendor/github.com/superseriousbusiness/activity/pub/util.go b/vendor/github.com/superseriousbusiness/activity/pub/util.go index d8937bba23..a0675b76e1 100644 --- a/vendor/github.com/superseriousbusiness/activity/pub/util.go +++ b/vendor/github.com/superseriousbusiness/activity/pub/util.go @@ -753,7 +753,8 @@ func mustHaveActivityActorsMatchObjectActors(c context.Context, actors vocab.ActivityStreamsActorProperty, op vocab.ActivityStreamsObjectProperty, newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error), - boxIRI *url.URL) error { + boxIRI *url.URL, +) error { activityActorMap := make(map[string]bool, actors.Len()) for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() { id, err := ToId(iter) @@ -808,7 +809,8 @@ func mustHaveActivityActorsMatchObjectActors(c context.Context, func add(c context.Context, op vocab.ActivityStreamsObjectProperty, target vocab.ActivityStreamsTargetProperty, - db Database) error { + db Database, +) error { opIds := make([]*url.URL, 0, op.Len()) for iter := op.Begin(); iter != op.End(); iter = iter.Next() { id, err := ToId(iter) @@ -828,10 +830,11 @@ func add(c context.Context, // Create anonymous loop function to be able to properly scope the defer // for the database lock at each iteration. loopFn := func(t *url.URL) error { - if err := db.Lock(c, t); err != nil { + unlock, err := db.Lock(c, t) + if err != nil { return err } - defer db.Unlock(c, t) + defer unlock() if owns, err := db.Owns(c, t); err != nil { return err } else if !owns { @@ -889,7 +892,8 @@ func add(c context.Context, func remove(c context.Context, op vocab.ActivityStreamsObjectProperty, target vocab.ActivityStreamsTargetProperty, - db Database) error { + db Database, +) error { opIds := make(map[string]bool, op.Len()) for iter := op.Begin(); iter != op.End(); iter = iter.Next() { id, err := ToId(iter) @@ -909,10 +913,11 @@ func remove(c context.Context, // Create anonymous loop function to be able to properly scope the defer // for the database lock at each iteration. loopFn := func(t *url.URL) error { - if err := db.Lock(c, t); err != nil { + unlock, err := db.Lock(c, t) + if err != nil { return err } - defer db.Unlock(c, t) + defer unlock() if owns, err := db.Owns(c, t); err != nil { return err } else if !owns { diff --git a/vendor/modules.txt b/vendor/modules.txt index f7581a5189..c90b40b1e2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -284,7 +284,7 @@ github.com/stretchr/testify/suite # github.com/subosito/gotenv v1.2.0 ## explicit github.com/subosito/gotenv -# github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a +# github.com/superseriousbusiness/activity v1.1.0-gts ## explicit; go 1.18 github.com/superseriousbusiness/activity/pub github.com/superseriousbusiness/activity/streams