Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] small changes missed in previous dereferencer.GetAccount() PRs #1467

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 110 additions & 55 deletions internal/federation/dereferencing/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
}
}

// Pre-fetch a transport for requesting username, used by later deref procedures.
transport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
if err != nil {
return nil, fmt.Errorf("enrichAccount: couldn't create transport: %w", err)
Expand All @@ -163,19 +164,14 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.

if err == nil {
if account.Domain != accDomain {
// We have the correct accountDomain now; if it was different from
// the account domain we were provided, do another db lookup to check
// if we already had the account in the db under the account domain we
// just discovered, otherwise we risk thinking this is a new account
// and trying to put it into the database again (which will cause issues).
// After webfinger, we now have correct account domain from which we can do a final DB check.
alreadyAccount, err := d.db.GetAccountByUsernameDomain(ctx, account.Username, accDomain)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, fmt.Errorf("enrichAccount: db err looking for account again after webfinger: %w", err)
}

if err == nil {
// We already had the account in the database;
// continue by enriching that one instead.
// Enrich existing account.
account = alreadyAccount
}
}
Expand All @@ -197,14 +193,14 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
}
}

// Check whether this account URI is a blocked domain / subdomain
// Check whether this account URI is a blocked domain / subdomain.
if blocked, err := d.db.IsDomainBlocked(ctx, uri.Host); err != nil {
return nil, newErrDB(fmt.Errorf("enrichAccount: error checking blocked domain: %w", err))
} else if blocked {
return nil, fmt.Errorf("enrichAccount: %s is blocked", uri.Host)
}

// Mark deref+update handshake start
// Mark deref+update handshake start.
d.startHandshake(requestUser, uri)
defer d.stopHandshake(requestUser, uri)

Expand All @@ -225,7 +221,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
if account.Username == "" {
// No username was provided, so no webfinger was attempted earlier.
//
// Now we have a username we can attempt it now, this ensures up-to-date accountdomain info.
// Now we have a username we can attempt it, this ensures up-to-date accountdomain info.
accDomain, _, err := d.fingerRemoteAccount(ctx, transport, latestAcc.Username, uri.Host)

if err == nil {
Expand All @@ -238,32 +234,32 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
latestAcc.ID = account.ID
latestAcc.FetchedAt = time.Now()

// Fetch latest account avatar only if remote URI has changed
if latestAcc.AvatarRemoteURL != "" && latestAcc.AvatarRemoteURL != account.AvatarRemoteURL {
d.dereferencingAvatarsLock.Lock()
newAvatarID, err := d.fetchRemoteAccountMedia(ctx, transport, latestAcc.AvatarRemoteURL, latestAcc.ID, d.dereferencingAvatars, true, false)
d.dereferencingAvatarsLock.Unlock()
// Use the existing account media attachments by default.
latestAcc.AvatarMediaAttachmentID = account.AvatarMediaAttachmentID
latestAcc.HeaderMediaAttachmentID = account.HeaderMediaAttachmentID

if latestAcc.AvatarRemoteURL != account.AvatarRemoteURL && latestAcc.AvatarRemoteURL != "" {
// Account avatar URL has changed; fetch up-to-date copy and use new media ID.
latestAcc.AvatarMediaAttachmentID, err = d.fetchRemoteAccountAvatar(ctx,
transport,
latestAcc.AvatarRemoteURL,
latestAcc.ID,
)
if err != nil {
log.Errorf("error fetching remote avatar for account %s: %v", uri, err)
} else {
latestAcc.AvatarMediaAttachmentID = newAvatarID
}
} else {
latestAcc.AvatarMediaAttachmentID = account.AvatarMediaAttachmentID // no change / empty url
}

// Fetch latest account header only if remote URI has changed
if latestAcc.AvatarRemoteURL != "" && latestAcc.AvatarRemoteURL != account.AvatarRemoteURL {
d.dereferencingHeadersLock.Lock()
newHeaderID, err := d.fetchRemoteAccountMedia(ctx, transport, latestAcc.HeaderRemoteURL, latestAcc.ID, d.dereferencingHeaders, false, true)
d.dereferencingHeadersLock.Unlock()
if latestAcc.HeaderRemoteURL != account.HeaderRemoteURL && latestAcc.HeaderRemoteURL != "" {
// Account header URL has changed; fetch up-to-date copy and use new media ID.
latestAcc.HeaderMediaAttachmentID, err = d.fetchRemoteAccountHeader(ctx,
transport,
latestAcc.HeaderRemoteURL,
latestAcc.ID,
)
if err != nil {
log.Errorf("error fetching remote header for account %s: %v", uri, err)
} else {
latestAcc.HeaderMediaAttachmentID = newHeaderID
}
} else {
latestAcc.HeaderMediaAttachmentID = account.HeaderMediaAttachmentID // no change / empty url
}

// Fetch the latest remote account emoji IDs used in account display name/bio.
Expand Down Expand Up @@ -338,47 +334,106 @@ func (d *deref) dereferenceAccountable(ctx context.Context, transport transport.
return nil, newErrWrongType(fmt.Errorf("DereferenceAccountable: type name %s not supported as Accountable", t.GetTypeName()))
}

func (d *deref) fetchRemoteAccountMedia(
ctx context.Context,
transport transport.Transport,
mediaRemoteURL string,
targetAccountID string,
dereferencingMap map[string]*media.ProcessingMedia,
avatar bool,
header bool,
) (string, error) {
// first check if we're already processing this media
if alreadyProcessing, ok := dereferencingMap[targetAccountID]; ok {
// we're already on it, nothing else to do
return alreadyProcessing.AttachmentID(), nil
}

avatarIRI, err := url.Parse(mediaRemoteURL)
func (d *deref) fetchRemoteAccountAvatar(ctx context.Context, tsport transport.Transport, avatarURL string, accountID string) (string, error) {
// Parse and validate provided media URL.
avatarURI, err := url.Parse(avatarURL)
if err != nil {
return "", err
}

// Acquire lock for derefs map.
unlock := d.derefAvatarsMu.Lock()
defer unlock()

if processing, ok := d.derefAvatars[avatarURL]; ok {
// we're already dereferencing it, nothing to do.
return processing.AttachmentID(), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps before returning, it's worth calling processing.LoadAttachment to make sure that the attachment will really be in the database for the caller to select with the ID; it's idempotent anyway I think so it should be fine.

}

// Set the media data function to dereference avatar from URI.
data := func(ctx context.Context) (io.ReadCloser, int64, error) {
return tsport.DereferenceMedia(ctx, avatarURI)
}

// Create new media processing request from the media manager instance.
processing, err := d.mediaManager.ProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
Avatar: func() *bool { v := false; return &v }(),
RemoteURL: &avatarURL,
})
if err != nil {
return "", err
}

// Store media in map to mark as processing.
d.derefAvatars[avatarURL] = processing

// Unlock map.
unlock()

defer func() {
// On exit safely remove media from map.
unlock := d.derefAvatarsMu.Lock()
delete(d.derefAvatars, avatarURL)
unlock()
}()

// Start media attachment loading (blocking call).
if _, err := processing.LoadAttachment(ctx); err != nil {
return "", err
}

return processing.AttachmentID(), nil
}

func (d *deref) fetchRemoteAccountHeader(ctx context.Context, tsport transport.Transport, headerURL string, accountID string) (string, error) {
// Parse and validate provided media URL.
headerURI, err := url.Parse(headerURL)
if err != nil {
return "", err
}

data := func(innerCtx context.Context) (io.ReadCloser, int64, error) {
return transport.DereferenceMedia(innerCtx, avatarIRI)
// Acquire lock for derefs map.
unlock := d.derefHeadersMu.Lock()
defer unlock()

if processing, ok := d.derefHeaders[headerURL]; ok {
// we're already dereferencing it, nothing to do.
return processing.AttachmentID(), nil
}

// Set the media data function to dereference header from URI.
data := func(ctx context.Context) (io.ReadCloser, int64, error) {
return tsport.DereferenceMedia(ctx, headerURI)
}

processingMedia, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccountID, &media.AdditionalMediaInfo{
RemoteURL: &mediaRemoteURL,
Avatar: &avatar,
Header: &header,
// Create new media processing request from the media manager instance.
processing, err := d.mediaManager.ProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
Header: func() *bool { v := true; return &v }(),
RemoteURL: &headerURL,
})
if err != nil {
return "", err
}

// store it in our map to indicate it's in process
dereferencingMap[targetAccountID] = processingMedia
defer delete(dereferencingMap, targetAccountID)
if _, err := processingMedia.LoadAttachment(ctx); err != nil {
// Store media in map to mark as processing.
d.derefHeaders[headerURL] = processing

// Unlock map.
unlock()

defer func() {
// On exit safely remove media from map.
unlock := d.derefHeadersMu.Lock()
delete(d.derefHeaders, headerURL)
unlock()
}()

// Start media attachment loading (blocking call).
if _, err := processing.LoadAttachment(ctx); err != nil {
return "", err
}

return processingMedia.AttachmentID(), nil
return processing.AttachmentID(), nil
}

func (d *deref) fetchRemoteAccountEmojis(ctx context.Context, targetAccount *gtsmodel.Account, requestingUsername string) (bool, error) {
Expand Down
47 changes: 27 additions & 20 deletions internal/federation/dereferencing/dereferencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"sync"

"codeberg.org/gruf/go-mutexes"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
Expand Down Expand Up @@ -58,30 +59,36 @@ type Dereferencer interface {
}

type deref struct {
db db.DB
typeConverter typeutils.TypeConverter
transportController transport.Controller
mediaManager media.Manager
dereferencingAvatars map[string]*media.ProcessingMedia
dereferencingAvatarsLock sync.Mutex
dereferencingHeaders map[string]*media.ProcessingMedia
dereferencingHeadersLock sync.Mutex
dereferencingEmojis map[string]*media.ProcessingEmoji
dereferencingEmojisLock sync.Mutex
handshakes map[string][]*url.URL
handshakeSync sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map
db db.DB
typeConverter typeutils.TypeConverter
transportController transport.Controller
mediaManager media.Manager
derefAvatars map[string]*media.ProcessingMedia
derefAvatarsMu mutexes.Mutex
derefHeaders map[string]*media.ProcessingMedia
derefHeadersMu mutexes.Mutex
derefEmojis map[string]*media.ProcessingEmoji
derefEmojisMu mutexes.Mutex
handshakes map[string][]*url.URL
handshakeSync sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map
}

// NewDereferencer returns a Dereferencer initialized with the given parameters.
func NewDereferencer(db db.DB, typeConverter typeutils.TypeConverter, transportController transport.Controller, mediaManager media.Manager) Dereferencer {
return &deref{
db: db,
typeConverter: typeConverter,
transportController: transportController,
mediaManager: mediaManager,
dereferencingAvatars: make(map[string]*media.ProcessingMedia),
dereferencingHeaders: make(map[string]*media.ProcessingMedia),
dereferencingEmojis: make(map[string]*media.ProcessingEmoji),
handshakes: make(map[string][]*url.URL),
db: db,
typeConverter: typeConverter,
transportController: transportController,
mediaManager: mediaManager,
derefAvatars: make(map[string]*media.ProcessingMedia),
derefHeaders: make(map[string]*media.ProcessingMedia),
derefEmojis: make(map[string]*media.ProcessingEmoji),
handshakes: make(map[string][]*url.URL),

// use wrapped mutexes to allow safely deferring unlock
// even when more granular locks are required (only unlocks once).
derefAvatarsMu: mutexes.WithSafety(mutexes.New()),
derefHeadersMu: mutexes.WithSafety(mutexes.New()),
derefEmojisMu: mutexes.WithSafety(mutexes.New()),
}
}
33 changes: 15 additions & 18 deletions internal/federation/dereferencing/emoji.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, r
processingEmoji *media.ProcessingEmoji
)

d.dereferencingEmojisLock.Lock() // LOCK HERE
// Acquire lock for derefs map.
unlock := d.derefEmojisMu.Lock()
defer unlock()

// first check if we're already processing this emoji
if alreadyProcessing, ok := d.dereferencingEmojis[shortcodeDomain]; ok {
if alreadyProcessing, ok := d.derefEmojis[shortcodeDomain]; ok {
// we're already on it, no worries
processingEmoji = alreadyProcessing
} else {
// not processing it yet, let's start
t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername)
if err != nil {
d.dereferencingEmojisLock.Unlock()
return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err)
}

derefURI, err := url.Parse(remoteURL)
if err != nil {
d.dereferencingEmojisLock.Unlock()
return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err)
}

Expand All @@ -63,29 +63,26 @@ func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, r

newProcessing, err := d.mediaManager.ProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh)
if err != nil {
d.dereferencingEmojisLock.Unlock()
return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err)
}

// store it in our map to indicate it's in process
d.dereferencingEmojis[shortcodeDomain] = newProcessing
d.derefEmojis[shortcodeDomain] = newProcessing
processingEmoji = newProcessing
}

d.dereferencingEmojisLock.Unlock()
// Unlock map.
unlock()

load := func(innerCtx context.Context) error {
_, err := processingEmoji.LoadEmoji(innerCtx)
return err
}

cleanup := func() {
d.dereferencingEmojisLock.Lock()
delete(d.dereferencingHeaders, shortcodeDomain)
d.dereferencingEmojisLock.Unlock()
}
defer func() {
// On exit safely remove emoji from map.
unlock := d.derefEmojisMu.Lock()
delete(d.derefEmojis, shortcodeDomain)
unlock()
}()

if err := loadAndCleanup(ctx, load, cleanup); err != nil {
// Start emoji attachment loading (blocking call).
if _, err := processingEmoji.LoadEmoji(ctx); err != nil {
return nil, err
}

Expand Down
Loading