Skip to content

Commit

Permalink
all: add support for direct media
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Nov 6, 2024
1 parent 733dc14 commit 8d042cc
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 62 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/lib/pq v1.10.9
github.com/rs/zerolog v1.33.0
go.mau.fi/util v0.8.1
go.mau.fi/util v0.8.2-0.20241106111346-576742786fe9
go.mau.fi/webp v0.1.0
go.mau.fi/whatsmeow v0.0.0-20241030164414-f98aea1881f6
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
Expand All @@ -18,7 +18,7 @@ require (
golang.org/x/sync v0.8.0
google.golang.org/protobuf v1.35.1
gopkg.in/yaml.v3 v3.0.1
maunium.net/go/mautrix v0.21.1
maunium.net/go/mautrix v0.21.2-0.20241106121131-f588c35d8b1c
)

require (
Expand All @@ -37,7 +37,7 @@ require (
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/yuin/goldmark v1.7.7 // indirect
github.com/yuin/goldmark v1.7.8 // indirect
go.mau.fi/libsignal v0.1.1 // indirect
go.mau.fi/zeroconfig v0.1.3 // indirect
golang.org/x/crypto v0.28.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/yuin/goldmark v1.7.7 h1:5m9rrB1sW3JUMToKFQfb+FGt1U7r57IHu5GrYrG2nqU=
github.com/yuin/goldmark v1.7.7/go.mod h1:uzxRWxtg69N339t3louHJ7+O03ezfj6PlliRlaOzY1E=
github.com/yuin/goldmark v1.7.8 h1:iERMLn0/QJeHFhxSt3p6PeN9mGnvIKSpG9YYorDMnic=
github.com/yuin/goldmark v1.7.8/go.mod h1:uzxRWxtg69N339t3louHJ7+O03ezfj6PlliRlaOzY1E=
go.mau.fi/libsignal v0.1.1 h1:m/0PGBh4QKP/I1MQ44ti4C0fMbLMuHb95cmDw01FIpI=
go.mau.fi/libsignal v0.1.1/go.mod h1:QLs89F/OA3ThdSL2Wz2p+o+fi8uuQUz0e1BRa6ExdBw=
go.mau.fi/util v0.8.1 h1:Ga43cz6esQBYqcjZ/onRoVnYWoUwjWbsxVeJg2jOTSo=
go.mau.fi/util v0.8.1/go.mod h1:T1u/rD2rzidVrBLyaUdPpZiJdP/rsyi+aTzn0D+Q6wc=
go.mau.fi/util v0.8.2-0.20241106111346-576742786fe9 h1:zYcb/lTZudowXAjKi6Yc2/2y5xxglPFfy9ZT2pNGsuM=
go.mau.fi/util v0.8.2-0.20241106111346-576742786fe9/go.mod h1:T1u/rD2rzidVrBLyaUdPpZiJdP/rsyi+aTzn0D+Q6wc=
go.mau.fi/webp v0.1.0 h1:BHObH/DcFntT9KYun5pDr0Ot4eUZO8k2C7eP7vF4ueA=
go.mau.fi/webp v0.1.0/go.mod h1:e42Z+VMFrUMS9cpEwGRIor+lQWO8oUAyPyMtcL+NMt8=
go.mau.fi/whatsmeow v0.0.0-20241030164414-f98aea1881f6 h1:ibChSQNQa6WTO+jUuJQz9x7qwCQoeIl/zlNCa/dAtvg=
Expand Down Expand Up @@ -101,5 +101,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/mautrix v0.21.1 h1:Z+e448jtlY977iC1kokNJTH5kg2WmDpcQCqn+v9oZOA=
maunium.net/go/mautrix v0.21.1/go.mod h1:7F/S6XAdyc/6DW+Q7xyFXRSPb6IjfqMb1OMepQ8C8OE=
maunium.net/go/mautrix v0.21.2-0.20241106121131-f588c35d8b1c h1:X8EZp6mkboTX3ACDpYQgFW2Vb98DF2pgcnqrAfqSPrI=
maunium.net/go/mautrix v0.21.2-0.20241106121131-f588c35d8b1c/go.mod h1:UBuBMbPJfh1AqYc1K1Lr0eQclx5vs1k1iiLVO/iMyw4=
21 changes: 12 additions & 9 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func (wa *WhatsAppConnector) LoadUserLogin(_ context.Context, login *bridgev2.Us
Main: wa,
UserLogin: login,

historySyncs: make(chan *waHistorySync.HistorySync, 64),
resyncQueue: make(map[types.JID]resyncQueueItem),
mediaRetryLock: semaphore.NewWeighted(wa.Config.HistorySync.MediaRequests.MaxAsyncHandle),
historySyncs: make(chan *waHistorySync.HistorySync, 64),
resyncQueue: make(map[types.JID]resyncQueueItem),
directMediaRetries: make(map[networkid.MessageID]*directMediaRetry),
mediaRetryLock: semaphore.NewWeighted(wa.Config.HistorySync.MediaRequests.MaxAsyncHandle),
}
login.Client = w

Expand Down Expand Up @@ -87,12 +88,14 @@ type WhatsAppClient struct {
Device *store.Device
JID types.JID

historySyncs chan *waHistorySync.HistorySync
stopLoops atomic.Pointer[context.CancelFunc]
resyncQueue map[types.JID]resyncQueueItem
resyncQueueLock sync.Mutex
nextResync time.Time
mediaRetryLock *semaphore.Weighted
historySyncs chan *waHistorySync.HistorySync
stopLoops atomic.Pointer[context.CancelFunc]
resyncQueue map[types.JID]resyncQueueItem
resyncQueueLock sync.Mutex
nextResync time.Time
directMediaRetries map[networkid.MessageID]*directMediaRetry
directMediaLock sync.Mutex
mediaRetryLock *semaphore.Weighted

lastPhoneOfflineWarning time.Time
}
Expand Down
210 changes: 210 additions & 0 deletions pkg/connector/directmedia.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
// Copyright (C) 2024 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package connector

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"sync"

"github.com/rs/zerolog"
"go.mau.fi/util/exsync"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/proto/waMmsRetry"
"go.mau.fi/whatsmeow/types/events"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/mediaproxy"

"go.mau.fi/mautrix-whatsapp/pkg/msgconv"
"go.mau.fi/mautrix-whatsapp/pkg/waid"
)

var _ bridgev2.DirectMediableNetwork = (*WhatsAppConnector)(nil)

func (wa *WhatsAppConnector) SetUseDirectMedia() {
wa.MsgConv.DirectMedia = true
}

var ErrReloadNeeded = mautrix.RespError{
ErrCode: "FI.MAU.WHATSAPP_RELOAD_NEEDED",
Err: "Media is no longer available on WhatsApp servers and must be re-requested from your phone",
StatusCode: http.StatusNotFound,
}

func (wa *WhatsAppConnector) Download(ctx context.Context, mediaID networkid.MediaID, params map[string]string) (mediaproxy.GetMediaResponse, error) {
parsedID, receiverID, err := waid.ParseMediaID(mediaID)
if err != nil {
return nil, err
}
msg, err := wa.Bridge.DB.Message.GetFirstPartByID(ctx, receiverID, parsedID.String())
if err != nil {
return nil, fmt.Errorf("failed to get message: %w", err)
} else if msg == nil {
return nil, fmt.Errorf("message not found")
}
dmm := msg.Metadata.(*waid.MessageMetadata).DirectMediaMeta
if dmm == nil {
return nil, fmt.Errorf("message does not have direct media metadata")
}
var keys *msgconv.FailedMediaKeys
err = json.Unmarshal(dmm, &keys)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal media keys: %w", err)
}
var ul *bridgev2.UserLogin
if receiverID != "" {
ul = wa.Bridge.GetCachedUserLoginByID(receiverID)
} else {
logins, err := wa.Bridge.GetUserLoginsInPortal(ctx, msg.Room)
if err != nil {
return nil, fmt.Errorf("failed to get user logins in portal: %w", err)
}
for _, login := range logins {
if login.Client.IsLoggedIn() {
ul = login
break
}
}
}
if ul == nil || !ul.Client.IsLoggedIn() {
return nil, fmt.Errorf("no logged in user found")
}
waClient := ul.Client.(*WhatsAppClient)
if waClient.Client == nil {
return nil, fmt.Errorf("no WhatsApp client found on login")
}
return &mediaproxy.GetMediaResponseFile{
Callback: func(f *os.File) error {
err := waClient.Client.DownloadToFile(keys, f)
if errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith403) || errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith404) || errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith410) {
if _, tryReload := params["fi.mau.whatsapp.reload_media"]; !tryReload {
return ErrReloadNeeded
}
err = waClient.requestAndWaitDirectMedia(ctx, msg.ID, keys)
if err != nil {
return err
}
err = waClient.Client.DownloadToFile(keys, f)
}
if errors.Is(err, whatsmeow.ErrFileLengthMismatch) || errors.Is(err, whatsmeow.ErrInvalidMediaSHA256) {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Mismatching media checksums in message. Ignoring because WhatsApp seems to ignore them too")
} else if err != nil {
return err
}
return nil
},
// TODO?
ContentType: "",
}, nil
}

type directMediaRetry struct {
sync.Mutex
resultURL string
wait *exsync.Event
requested bool
}

func (wa *WhatsAppClient) getDirectMediaRetryState(msgID networkid.MessageID, create bool) *directMediaRetry {
wa.directMediaLock.Lock()
defer wa.directMediaLock.Unlock()
retry, ok := wa.directMediaRetries[msgID]
if !ok && create {
retry = &directMediaRetry{
wait: exsync.NewEvent(),
}
wa.directMediaRetries[msgID] = retry
}
return retry
}

func (wa *WhatsAppClient) requestAndWaitDirectMedia(ctx context.Context, rawMsgID networkid.MessageID, keys *msgconv.FailedMediaKeys) error {
state, err := wa.requestDirectMedia(rawMsgID, keys.Key)
if err != nil {
return err
}
select {
case <-state.wait.GetChan():
keys.DirectPath = state.resultURL
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (wa *WhatsAppClient) requestDirectMedia(rawMsgID networkid.MessageID, key []byte) (*directMediaRetry, error) {
state := wa.getDirectMediaRetryState(rawMsgID, true)
state.Lock()
defer state.Unlock()
if !state.requested {
err := wa.sendMediaRequestDirect(rawMsgID, key)
if err != nil {
return nil, fmt.Errorf("failed to send media retry request: %w", err)
}
state.requested = true
}
return state, nil
}

func (wa *WhatsAppClient) receiveDirectMediaRetry(ctx context.Context, msg *database.Message, retry *events.MediaRetry) {
state := wa.getDirectMediaRetryState(msg.ID, false)
if state != nil {
state.Lock()
defer state.Unlock()
}
log := zerolog.Ctx(ctx)
var keys msgconv.FailedMediaKeys
err := json.Unmarshal(msg.Metadata.(*waid.MessageMetadata).DirectMediaMeta, &keys)
if err != nil {
log.Err(err).Msg("Failed to parse direct media metadata for media retry")
return
}
retryData, err := whatsmeow.DecryptMediaRetryNotification(retry, keys.Key)
if err != nil {
log.Warn().Err(err).Msg("Failed to decrypt media retry notification")
return
} else if retryData.GetResult() != waMmsRetry.MediaRetryNotification_SUCCESS {
errorName := waMmsRetry.MediaRetryNotification_ResultType_name[int32(retryData.GetResult())]
if retryData.GetDirectPath() == "" {
log.Warn().Str("error_name", errorName).Msg("Got error response in media retry notification")
log.Debug().Any("error_content", retryData).Msg("Full error response content")
return
}
log.Debug().Msg("Got error response in media retry notification, but response also contains a new download URL")
}
keys.DirectPath = retryData.GetDirectPath()
msg.Metadata.(*waid.MessageMetadata).DirectMediaMeta, err = json.Marshal(keys)
if err != nil {
log.Err(err).Msg("Failed to marshal updated direct media metadata")
return
}
err = wa.Main.Bridge.DB.Message.Update(ctx, msg)
if err != nil {
log.Err(err).Msg("Failed to update message with new direct media metadata")
}
if state != nil {
state.resultURL = retryData.GetDirectPath()
state.wait.Set()
}
}
9 changes: 6 additions & 3 deletions pkg/connector/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,16 @@ func (evt *WAMediaRetry) makeErrorEdit(part *database.Message, meta *msgconv.Pre

func (evt *WAMediaRetry) ConvertEdit(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, existing []*database.Message) (*bridgev2.ConvertedEdit, error) {
meta := existing[0].Metadata.(*waid.MessageMetadata)
if meta.Error != waid.MsgErrMediaNotFound {
if meta.DirectMediaMeta != nil {
evt.wa.receiveDirectMediaRetry(ctx, existing[0], evt.MediaRetry)
return nil, fmt.Errorf("%w: direct media retry", bridgev2.ErrIgnoringRemoteEvent)
} else if meta.Error != waid.MsgErrMediaNotFound {
return nil, fmt.Errorf("%w: message doesn't have media error", bridgev2.ErrIgnoringRemoteEvent)
} else if meta.MediaMeta == nil {
} else if meta.FailedMediaMeta == nil {
return nil, fmt.Errorf("%w: message doesn't have media metadata", bridgev2.ErrIgnoringRemoteEvent)
}
var mediaMeta msgconv.PreparedMedia
err := json.Unmarshal(meta.MediaMeta, &mediaMeta)
err := json.Unmarshal(meta.FailedMediaMeta, &mediaMeta)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal media metadata: %w", err)
}
Expand Down
37 changes: 19 additions & 18 deletions pkg/connector/mediarequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package connector

import (
"context"
"fmt"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -115,17 +116,9 @@ func (wa *WhatsAppClient) sendMediaRequests(ctx context.Context) {
}

func (wa *WhatsAppClient) sendMediaRequest(ctx context.Context, req *wadb.MediaRequest) {
msgID, err := waid.ParseMessageID(req.MessageID)
if err != nil {
err = wa.Main.DB.MediaRequest.Delete(ctx, wa.UserLogin.ID, req.MessageID)
if err != nil {
zerolog.Ctx(ctx).Err(err).Str("message_id", string(req.MessageID)).Msg("Failed to delete invalid media request")
}
return
}
log := zerolog.Ctx(ctx).With().Str("action", "send media request").Str("message_id", string(req.MessageID)).Logger()
defer func() {
err = wa.Main.DB.MediaRequest.Put(ctx, req)
err := wa.Main.DB.MediaRequest.Put(ctx, req)
if err != nil {
log.Err(err).Msg("Failed to save media request status")
}
Expand All @@ -144,15 +137,7 @@ func (wa *WhatsAppClient) sendMediaRequest(ctx context.Context, req *wadb.MediaR
req.Status = wadb.MediaBackfillRequestStatusRequestSkipped
return
}
err = wa.Client.SendMediaRetryReceipt(&types.MessageInfo{
ID: msgID.ID,
MessageSource: types.MessageSource{
IsFromMe: msgID.Sender.User == wa.JID.User,
IsGroup: msgID.Chat.Server != types.DefaultUserServer,
Sender: msgID.Sender,
Chat: msgID.Chat,
},
}, req.MediaKey)
err = wa.sendMediaRequestDirect(req.MessageID, req.MediaKey)
if err != nil {
log.Err(err).Msg("Failed to send media retry request")
req.Status = wadb.MediaBackfillRequestStatusRequestFailed
Expand All @@ -162,3 +147,19 @@ func (wa *WhatsAppClient) sendMediaRequest(ctx context.Context, req *wadb.MediaR
req.Status = wadb.MediaBackfillRequestStatusRequested
}
}

func (wa *WhatsAppClient) sendMediaRequestDirect(rawMsgID networkid.MessageID, key []byte) error {
msgID, err := waid.ParseMessageID(rawMsgID)
if err != nil {
return fmt.Errorf("failed to parse message ID: %w", err)
}
return wa.Client.SendMediaRetryReceipt(&types.MessageInfo{
ID: msgID.ID,
MessageSource: types.MessageSource{
IsFromMe: msgID.Sender.User == wa.JID.User,
IsGroup: msgID.Chat.Server != types.DefaultUserServer,
Sender: msgID.Sender,
Chat: msgID.Chat,
},
}, key)
}
Loading

0 comments on commit 8d042cc

Please sign in to comment.