-
Notifications
You must be signed in to change notification settings - Fork 117
/
Copy pathview.go
2045 lines (1814 loc) · 64.7 KB
/
view.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package webmail
// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
import (
"compress/gzip"
"context"
cryptrand "crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"path/filepath"
"reflect"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/mjl-/bstore"
"github.com/mjl-/sherpa"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/moxio"
"github.com/mjl-/mox/moxvar"
"github.com/mjl-/mox/smtp"
"github.com/mjl-/mox/store"
)
// Request is a request to an SSE connection to send messages, either for a new
// view, to continue with an existing view, or to a cancel an ongoing request.
type Request struct {
ID int64
SSEID int64 // SSE connection.
// To indicate a request is a continuation (more results) of the previous view.
// Echoed in events, client checks if it is getting results for the latest request.
ViewID int64
// If set, this request and its view are canceled. A new view must be started.
Cancel bool
Query Query
Page Page
}
type ThreadMode string
const (
ThreadOff ThreadMode = "off"
ThreadOn ThreadMode = "on"
ThreadUnread ThreadMode = "unread"
)
// Query is a request for messages that match filters, in a given order.
type Query struct {
OrderAsc bool // Order by received ascending or desending.
Threading ThreadMode
Filter Filter
NotFilter NotFilter
}
// AttachmentType is for filtering by attachment type.
type AttachmentType string
const (
AttachmentIndifferent AttachmentType = ""
AttachmentNone AttachmentType = "none"
AttachmentAny AttachmentType = "any"
AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
AttachmentPDF AttachmentType = "pdf"
AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
AttachmentDocument AttachmentType = "document" // odt, docx, ...
AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
)
// Filter selects the messages to return. Fields that are set must all match,
// for slices each element by match ("and").
type Filter struct {
// If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
MailboxID int64
// If true, also submailboxes are included in the search.
MailboxChildrenIncluded bool
// In case client doesn't know mailboxes and their IDs yet. Only used during sse
// connection setup, where it is turned into a MailboxID. Filtering only looks at
// MailboxID.
MailboxName string
Words []string // Case insensitive substring match for each string.
From []string
To []string // Including Cc and Bcc.
Oldest *time.Time
Newest *time.Time
Subject []string
Attachments AttachmentType
Labels []string
Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
SizeMin int64
SizeMax int64
}
// NotFilter matches messages that don't match these fields.
type NotFilter struct {
Words []string
From []string
To []string
Subject []string
Attachments AttachmentType
Labels []string
}
// Page holds pagination parameters for a request.
type Page struct {
// Start returning messages after this ID, if > 0. For pagination, fetching the
// next set of messages.
AnchorMessageID int64
// Number of messages to return, must be >= 1, we never return more than 10000 for
// one request.
Count int
// If > 0, return messages until DestMessageID is found. More than Count messages
// can be returned. For long-running searches, it may take a while before this
// message if found.
DestMessageID int64
}
// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
// included.
type MessageAddress struct {
Name string // Free-form name for display in mail applications.
User string // Localpart, encoded.
Domain dns.Domain
}
// MessageEnvelope is like message.Envelope, as used in message.Part, but including
// unicode host names for IDNA names.
type MessageEnvelope struct {
// todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
Date time.Time
Subject string
From []MessageAddress
Sender []MessageAddress
ReplyTo []MessageAddress
To []MessageAddress
CC []MessageAddress
BCC []MessageAddress
InReplyTo string
MessageID string
}
// MessageItem is sent by queries, it has derived information analyzed from
// message.Part, made for the needs of the message items in the message list.
// messages.
type MessageItem struct {
Message store.Message // Without ParsedBuf and MsgPrefix, for size.
Envelope MessageEnvelope
Attachments []Attachment
IsSigned bool
IsEncrypted bool
FirstLine string // Of message body, for showing as preview.
MatchQuery bool // If message does not match query, it can still be included because of threading.
MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
}
// ParsedMessage has more parsed/derived information about a message, intended
// for rendering the (contents of the) message. Information from MessageItem is
// not duplicated.
type ParsedMessage struct {
ID int64
Part message.Part
Headers map[string][]string
ViewMode store.ViewMode
// Text parts, can be empty.
Texts []string
// Whether there is an HTML part. The webclient renders HTML message parts through
// an iframe and a separate request with strict CSP headers to prevent script
// execution and loading of external resources, which isn't possible when loading
// in iframe with inline HTML because not all browsers support the iframe csp
// attribute.
HasHTML bool
ListReplyAddress *MessageAddress // From List-Post.
// Information used by MessageItem, not exported in this type.
envelope MessageEnvelope
attachments []Attachment
isSigned bool
isEncrypted bool
firstLine string
}
// EventStart is the first message sent on an SSE connection, giving the client
// basic data to populate its UI. After this event, messages will follow quickly in
// an EventViewMsgs event.
type EventStart struct {
SSEID int64
LoginAddress MessageAddress
Addresses []MessageAddress
DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
MailboxName string
Mailboxes []store.Mailbox
RejectsMailbox string
Settings store.Settings
AccountPath string // If nonempty, the path on same host to webaccount interface.
Version string
}
// DomainAddressConfig has the address (localpart) configuration for a domain, so
// the webmail client can decide if an address matches the addresses of the
// account.
type DomainAddressConfig struct {
LocalpartCatchallSeparator string // Can be empty.
LocalpartCaseSensitive bool
}
// EventViewMsgs contains messages for a view, possibly a continuation of an
// earlier list of messages.
type EventViewMsgs struct {
ViewID int64
RequestID int64
// If empty, this was the last message for the request. If non-empty, a list of
// thread messages. Each with the first message being the reason this thread is
// included and can be used as AnchorID in followup requests. If the threading mode
// is "off" in the query, there will always be only a single message. If a thread
// is sent, all messages in the thread are sent, including those that don't match
// the query (e.g. from another mailbox). Threads can be displayed based on the
// ThreadParentIDs field, with possibly slightly different display based on field
// ThreadMissingLink.
MessageItems [][]MessageItem
// If set, will match the target page.DestMessageID from the request.
ParsedMessage *ParsedMessage
// If set, there are no more messages in this view at this moment. Messages can be
// added, typically via Change messages, e.g. for new deliveries.
ViewEnd bool
}
// EventViewErr indicates an error during a query for messages. The request is
// aborted, no more request-related messages will be sent until the next request.
type EventViewErr struct {
ViewID int64
RequestID int64
Err string // To be displayed in client.
err error // Original message, for checking against context.Canceled.
}
// EventViewReset indicates that a request for the next set of messages in a few
// could not be fulfilled, e.g. because the anchor message does not exist anymore.
// The client should clear its list of messages. This can happen before
// EventViewMsgs events are sent.
type EventViewReset struct {
ViewID int64
RequestID int64
}
// EventViewChanges contain one or more changes relevant for the client, either
// with new mailbox total/unseen message counts, or messages added/removed/modified
// (flags) for the current view.
type EventViewChanges struct {
ViewID int64
Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
}
// ChangeMsgAdd adds a new message and possibly its thread to the view.
type ChangeMsgAdd struct {
store.ChangeAddUID
MessageItems []MessageItem
}
// ChangeMsgRemove removes one or more messages from the view.
type ChangeMsgRemove struct {
store.ChangeRemoveUIDs
}
// ChangeMsgFlags updates flags for one message.
type ChangeMsgFlags struct {
store.ChangeFlags
}
// ChangeMsgThread updates muted/collapsed fields for one message.
type ChangeMsgThread struct {
store.ChangeThread
}
// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
type ChangeMailboxRemove struct {
store.ChangeRemoveMailbox
}
// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
type ChangeMailboxAdd struct {
Mailbox store.Mailbox
}
// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
// It could be under a new parent.
type ChangeMailboxRename struct {
store.ChangeRenameMailbox
}
// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
type ChangeMailboxCounts struct {
store.ChangeMailboxCounts
}
// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
type ChangeMailboxSpecialUse struct {
store.ChangeMailboxSpecialUse
}
// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
// a message was added with a keyword that wasn't in the mailbox yet.
type ChangeMailboxKeywords struct {
store.ChangeMailboxKeywords
}
// View holds the information about the returned data for a query. It is used to
// determine whether mailbox changes should be sent to the client, we only send
// addition/removal/flag-changes of messages that are in view, or would extend it
// if the view is at the end of the results.
type view struct {
Request Request
// Received of last message we sent to the client. We use it to decide if a newly
// delivered message is within the view and the client should get a notification.
LastMessageReceived time.Time
// If set, the last message in the query view has been sent. There is no need to do
// another query, it will not return more data. Used to decide if an event for a
// new message should be sent.
End bool
// Whether message must or must not match mailboxIDs.
matchMailboxIDs bool
// Mailboxes to match, can be multiple, for matching children. If empty, there is
// no filter on mailboxes.
mailboxIDs map[int64]bool
// Threads sent to client. New messages for this thread are also sent, regardless
// of regular query matching, so also for other mailboxes. If the user (re)moved
// all messages of a thread, they may still receive events for the thread. Only
// filled when query with threading not off.
threadIDs map[int64]struct{}
}
// sses tracks all sse connections, and access to them.
var sses = struct {
sync.Mutex
gen int64
m map[int64]sse
}{m: map[int64]sse{}}
// sse represents an sse connection.
type sse struct {
ID int64 // Also returned in EventStart and used in Request to identify the request.
AccountName string // Used to check the authenticated user has access to the SSE connection.
Request chan Request // Goroutine will receive requests from here, coming from API calls.
}
// called by the goroutine when the connection is closed or breaks.
func (sse sse) unregister() {
sses.Lock()
defer sses.Unlock()
delete(sses.m, sse.ID)
// Drain any pending requests, preventing blocked goroutines from API calls.
for {
select {
case <-sse.Request:
default:
return
}
}
}
func sseRegister(accountName string) sse {
sses.Lock()
defer sses.Unlock()
sses.gen++
v := sse{sses.gen, accountName, make(chan Request, 1)}
sses.m[v.ID] = v
return v
}
// sseGet returns a reference to an existing connection if it exists and user
// has access.
func sseGet(id int64, accountName string) (sse, bool) {
sses.Lock()
defer sses.Unlock()
s := sses.m[id]
if s.AccountName != accountName {
return sse{}, false
}
return s, true
}
// ssetoken is a temporary token that has not yet been used to start an SSE
// connection. Created by Token, consumed by a new SSE connection.
type ssetoken struct {
token string // Uniquely generated.
accName string
address string // Address used to authenticate in call that created the token.
sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
validUntil time.Time
}
// ssetokens maintains unused tokens. We have just one, but it's a type so we
// can define methods.
type ssetokens struct {
sync.Mutex
accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
tokens map[string]ssetoken // Token to details, for finding account for a token.
}
var sseTokens = ssetokens{
accountTokens: map[string][]ssetoken{},
tokens: map[string]ssetoken{},
}
// xgenerate creates and saves a new token. It ensures no more than 10 tokens
// per account exist, removing old ones if needed.
func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
buf := make([]byte, 16)
_, err := cryptrand.Read(buf)
xcheckf(ctx, err, "generating token")
st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
x.Lock()
defer x.Unlock()
n := len(x.accountTokens[accName])
if n >= 10 {
for _, ost := range x.accountTokens[accName][:n-9] {
delete(x.tokens, ost.token)
}
copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
x.accountTokens[accName] = x.accountTokens[accName][:9]
}
x.accountTokens[accName] = append(x.accountTokens[accName], st)
x.tokens[st.token] = st
return st.token
}
// check verifies a token, and consumes it if valid.
func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
x.Lock()
defer x.Unlock()
st, ok := x.tokens[token]
if !ok {
return "", "", "", false, nil
}
delete(x.tokens, token)
if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
return "", "", "", false, errors.New("internal error, could not find token in account")
} else {
copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
if len(x.accountTokens[st.accName]) == 0 {
delete(x.accountTokens, st.accName)
}
}
if time.Now().After(st.validUntil) {
return "", "", "", false, nil
}
return st.accName, st.address, st.sessionToken, true, nil
}
// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
type ioErr struct {
err error
}
// ensure we have a non-nil moreHeaders, taking it from Settings.
func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
if moreHeaders != nil {
return moreHeaders, nil
}
s := store.Settings{ID: 1}
if err := tx.Get(&s); err != nil {
return nil, fmt.Errorf("get settings: %v", err)
}
moreHeaders = s.ShowHeaders
if moreHeaders == nil {
moreHeaders = []string{} // Ensure we won't get Settings again next call.
}
return moreHeaders, nil
}
// serveEvents serves an SSE connection. Authentication is done through a query
// string parameter "singleUseToken", a one-time-use token returned by the Token
// API call.
func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
log.Error("internal error: ResponseWriter not a http.Flusher")
http.Error(w, "500 - internal error - cannot sync to http connection", 500)
return
}
q := r.URL.Query()
token := q.Get("singleUseToken")
if token == "" {
http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
return
}
accName, address, sessionToken, ok, err := sseTokens.check(token)
if err != nil {
http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
return
}
if !ok {
http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
return
}
if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
return
}
// We can simulate a slow SSE connection. It seems firefox doesn't slow down
// incoming responses with its slow-network similation.
var waitMin, waitMax time.Duration
waitMinMsec := q.Get("waitMinMsec")
waitMaxMsec := q.Get("waitMaxMsec")
if waitMinMsec != "" && waitMaxMsec != "" {
if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
return
} else {
waitMin = time.Duration(v) * time.Millisecond
}
if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
return
} else {
waitMax = time.Duration(v) * time.Millisecond
}
}
// Parse the request with initial mailbox/search criteria.
var req Request
dec := json.NewDecoder(strings.NewReader(q.Get("request")))
dec.DisallowUnknownFields()
if err := dec.Decode(&req); err != nil {
http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
return
} else if req.Page.Count <= 0 {
http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
return
}
if req.Query.Threading == "" {
req.Query.Threading = ThreadOff
}
var writer *eventWriter
metricSSEConnections.Inc()
defer metricSSEConnections.Dec()
// Below here, error handling cause through xcheckf, which panics with
// *sherpa.Error, after which we send an error event to the client. We can also get
// an *ioErr when the connection is broken.
defer func() {
x := recover()
if x == nil {
return
}
if err, ok := x.(*sherpa.Error); ok {
writer.xsendEvent(ctx, log, "fatalErr", err.Message)
} else if _, ok := x.(ioErr); ok {
return
} else {
log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
debug.PrintStack()
metrics.PanicInc(metrics.Webmail)
panic(x)
}
}()
h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
// We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
// keys), so should be quite compressible.
var out writeFlusher
gz := mox.AcceptsGzip(r)
if gz {
h.Set("Content-Encoding", "gzip")
out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
} else {
out = nopFlusher{w}
}
out = httpFlusher{out, flusher}
// We'll be writing outgoing SSE events through writer.
writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
defer writer.close()
// Fetch initial data.
acc, err := store.OpenAccount(log, accName)
xcheckf(ctx, err, "open account")
defer func() {
err := acc.Close()
log.Check(err, "closing account")
}()
comm := store.RegisterComm(acc)
defer comm.Unregister()
// List addresses that the client can use to send email from.
accConf, _ := acc.Conf()
loginAddr, err := smtp.ParseAddress(address)
xcheckf(ctx, err, "parsing login address")
_, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false)
xcheckf(ctx, err, "looking up destination for login address")
loginName := accConf.FullName
if dest.FullName != "" {
loginName = dest.FullName
}
loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
var addresses []MessageAddress
for a, dest := range accConf.Destinations {
name := dest.FullName
if name == "" {
name = accConf.FullName
}
var ma MessageAddress
if strings.HasPrefix(a, "@") {
dom, err := dns.ParseDomain(a[1:])
xcheckf(ctx, err, "parsing destination address for account")
ma = MessageAddress{Domain: dom}
} else {
addr, err := smtp.ParseAddress(a)
xcheckf(ctx, err, "parsing destination address for account")
ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
}
addresses = append(addresses, ma)
}
// User is allowed to send using alias address as message From address. Webmail
// will choose it when replying to a message sent to that address.
aliasAddrs := map[MessageAddress]bool{}
for _, a := range accConf.Aliases {
if a.Alias.AllowMsgFrom {
ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
if !aliasAddrs[ma] {
addresses = append(addresses, ma)
}
aliasAddrs[ma] = true
}
}
// We implicitly start a query. We use the reqctx for the transaction, because the
// transaction is passed to the query, which can be canceled.
reqctx, reqctxcancel := context.WithCancel(ctx)
defer func() {
// We also cancel in cancelDrain later on, but there is a brief window where the
// context wouldn't be canceled.
if reqctxcancel != nil {
reqctxcancel()
reqctxcancel = nil
}
}()
// qtx is kept around during connection initialization, until we pass it off to the
// goroutine that starts querying for messages.
var qtx *bstore.Tx
defer func() {
if qtx != nil {
err := qtx.Rollback()
log.Check(err, "rolling back")
}
}()
var mbl []store.Mailbox
settings := store.Settings{ID: 1}
// We only take the rlock when getting the tx.
acc.WithRLock(func() {
// Now a read-only transaction we'll use during the query.
qtx, err = acc.DB.Begin(reqctx, false)
xcheckf(ctx, err, "begin transaction")
mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
xcheckf(ctx, err, "list mailboxes")
err = qtx.Get(&settings)
xcheckf(ctx, err, "get settings")
})
// Find the designated mailbox if a mailbox name is set, or there are no filters at all.
var zerofilter Filter
var zeronotfilter NotFilter
var mailbox store.Mailbox
var mailboxPrefixes []string
var matchMailboxes bool
mailboxIDs := map[int64]bool{}
mailboxName := req.Query.Filter.MailboxName
if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
if mailboxName == "" {
mailboxName = "Inbox"
}
var inbox store.Mailbox
for _, e := range mbl {
if e.Name == mailboxName {
mailbox = e
}
if e.Name == "Inbox" {
inbox = e
}
}
if mailbox.ID == 0 {
mailbox = inbox
}
if mailbox.ID == 0 {
xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
}
req.Query.Filter.MailboxID = mailbox.ID
req.Query.Filter.MailboxName = ""
mailboxPrefixes = []string{mailbox.Name + "/"}
matchMailboxes = true
mailboxIDs[mailbox.ID] = true
} else {
matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
}
if req.Query.Filter.MailboxChildrenIncluded {
xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
}
// todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
sse := sseRegister(acc.Name)
defer sse.unregister()
// Per-domain localpart config so webclient can decide if an address belongs to the account.
domainAddressConfigs := map[string]DomainAddressConfig{}
for _, a := range addresses {
dom, _ := mox.Conf.Domain(a.Domain)
domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
}
// Write first event, allowing client to fill its UI with mailboxes.
start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
writer.xsendEvent(ctx, log, "start", start)
// The goroutine doing the querying will send messages on these channels, which
// result in an event being written on the SSE connection.
viewMsgsc := make(chan EventViewMsgs)
viewErrc := make(chan EventViewErr)
viewResetc := make(chan EventViewReset)
donec := make(chan int64) // When request is done.
// Start a view, it determines if we send a change to the client. And start an
// implicit query for messages, we'll send the messages to the client which can
// fill its ui with messages.
v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
qtx = nil // viewRequestTx closes qtx
// When canceling a query, we must drain its messages until it says it is done.
// Otherwise the sending goroutine would hang indefinitely on a channel send.
cancelDrain := func() {
if reqctxcancel != nil {
// Cancel the goroutine doing the querying.
reqctxcancel()
reqctx = nil
reqctxcancel = nil
} else {
return
}
// Drain events until done.
for {
select {
case <-viewMsgsc:
case <-viewErrc:
case <-viewResetc:
case <-donec:
return
}
}
}
// If we stop and a query is in progress, we must drain the channel it will send on.
defer cancelDrain()
// Changes broadcasted by other connections on this account. If applicable for the
// connection/view, we send events.
xprocessChanges := func(changes []store.Change) {
taggedChanges := [][2]any{}
// We get a transaction first time we need it.
var xtx *bstore.Tx
defer func() {
if xtx != nil {
err := xtx.Rollback()
log.Check(err, "rolling back transaction")
}
}()
ensureTx := func() error {
if xtx != nil {
return nil
}
acc.RLock()
defer acc.RUnlock()
var err error
xtx, err = acc.DB.Begin(ctx, false)
return err
}
// This getmsg will now only be called mailboxID+UID, not with messageID set.
// todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
if err := ensureTx(); err != nil {
return store.Message{}, fmt.Errorf("transaction: %v", err)
}
return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
}
// Additional headers from settings to add to MessageItems.
var moreHeaders []string
xmoreHeaders := func() []string {
err := ensureTx()
xcheckf(ctx, err, "transaction")
moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
xcheckf(ctx, err, "ensuring more headers")
return moreHeaders
}
// Return uids that are within range in view. Because the end has been reached, or
// because the UID is not after the last message.
xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
uidsAny := make([]any, len(uids))
for i, uid := range uids {
uidsAny[i] = uid
}
err := ensureTx()
xcheckf(ctx, err, "transaction")
q := bstore.QueryTx[store.Message](xtx)
q.FilterNonzero(store.Message{MailboxID: mailboxID})
q.FilterEqual("UID", uidsAny...)
mbOK := v.matchesMailbox(mailboxID)
err = q.ForEach(func(m store.Message) error {
_, thread := v.threadIDs[m.ThreadID]
if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
changedUIDs = append(changedUIDs, m.UID)
}
return nil
})
xcheckf(ctx, err, "fetching messages for change")
return changedUIDs
}
// Forward changes that are relevant to the current view.
for _, change := range changes {
switch c := change.(type) {
case store.ChangeAddUID:
ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
xcheckf(ctx, err, "matching new message against view")
m, err := getmsg(0, c.MailboxID, c.UID)
xcheckf(ctx, err, "get message")
_, thread := v.threadIDs[m.ThreadID]
if !ok && !thread {
continue
}
state := msgState{acc: acc}
mi, err := messageItem(log, m, &state, xmoreHeaders())
state.clear()
xcheckf(ctx, err, "make messageitem")
mi.MatchQuery = ok
mil := []MessageItem{mi}
if !thread && req.Query.Threading != ThreadOff {
err := ensureTx()
xcheckf(ctx, err, "transaction")
more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders())
xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
mil = append(mil, more...)
v.threadIDs[m.ThreadID] = struct{}{}
}
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
// If message extends the view, store it as such.
if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
v.LastMessageReceived = m.Received
}
case store.ChangeRemoveUIDs:
// We may send changes for uids the client doesn't know, that's fine.
changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
if len(changedUIDs) == 0 {
continue
}
ch := ChangeMsgRemove{c}
ch.UIDs = changedUIDs
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
case store.ChangeFlags:
// We may send changes for uids the client doesn't know, that's fine.
changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
if len(changedUIDs) == 0 {
continue
}
ch := ChangeMsgFlags{c}
ch.UID = changedUIDs[0]
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
case store.ChangeThread:
// Change in muted/collaped state, just always ship it.
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
case store.ChangeRemoveMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
case store.ChangeAddMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
case store.ChangeRenameMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
case store.ChangeMailboxCounts:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
case store.ChangeMailboxSpecialUse:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
case store.ChangeMailboxKeywords:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
case store.ChangeAddSubscription:
// Webmail does not care about subscriptions.
default:
panic(fmt.Sprintf("missing case for change %T", c))
}
}
if len(taggedChanges) > 0 {
viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
}
}
timer := time.NewTimer(5 * time.Minute) // For keepalives.
defer timer.Stop()
for {
if writer.wrote {
timer.Reset(5 * time.Minute)
writer.wrote = false
}
pending := comm.Pending
if reqctx != nil {
pending = nil
}
select {
case <-mox.Shutdown.Done():
writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
case <-timer.C:
_, err := fmt.Fprintf(out, ": keepalive\n\n")
if err != nil {
log.Errorx("write keepalive", err)
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()