@@ -12,6 +12,7 @@ import (
12
12
"github.com/xmtp/xmtpd/pkg/db/queries"
13
13
envUtils "github.com/xmtp/xmtpd/pkg/envelopes"
14
14
clientInterceptors "github.com/xmtp/xmtpd/pkg/interceptors/client"
15
+ "github.com/xmtp/xmtpd/pkg/misbehavior"
15
16
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
16
17
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api"
17
18
"github.com/xmtp/xmtpd/pkg/registrant"
@@ -31,6 +32,7 @@ type syncWorker struct {
31
32
subscriptions map [uint32 ]struct {}
32
33
subscriptionsMutex sync.RWMutex
33
34
cancel context.CancelFunc
35
+ misbehaviorService misbehavior.MisbehaviorService
34
36
}
35
37
36
38
type originatorStream struct {
@@ -58,14 +60,15 @@ func startSyncWorker(
58
60
ctx , cancel := context .WithCancel (ctx )
59
61
60
62
s := & syncWorker {
61
- ctx : ctx ,
62
- log : log .Named ("syncWorker" ),
63
- nodeRegistry : nodeRegistry ,
64
- registrant : registrant ,
65
- store : store ,
66
- wg : sync.WaitGroup {},
67
- subscriptions : make (map [uint32 ]struct {}),
68
- cancel : cancel ,
63
+ ctx : ctx ,
64
+ log : log .Named ("syncWorker" ),
65
+ nodeRegistry : nodeRegistry ,
66
+ registrant : registrant ,
67
+ store : store ,
68
+ wg : sync.WaitGroup {},
69
+ subscriptions : make (map [uint32 ]struct {}),
70
+ cancel : cancel ,
71
+ misbehaviorService : misbehavior .NewLoggingMisbehaviorService (log ),
69
72
}
70
73
if err := s .start (); err != nil {
71
74
return nil , err
@@ -362,8 +365,19 @@ func (s *syncWorker) validateAndInsertEnvelope(
362
365
lastNs = stream .lastEnvelope .OriginatorNs ()
363
366
}
364
367
if env .OriginatorSequenceID () != lastSequenceID + 1 || env .OriginatorNs () < lastNs {
365
- // TODO(rich) Submit misbehavior report and continue
366
- s .log .Error ("Received out of order envelope" )
368
+ if report , err := misbehavior .NewSafetyFailureReport (
369
+ stream .nodeID ,
370
+ message_api .Misbehavior_MISBEHAVIOR_OUT_OF_ORDER ,
371
+ true ,
372
+ []* envUtils.OriginatorEnvelope {stream .lastEnvelope , env },
373
+ ); err == nil {
374
+ if err = s .misbehaviorService .SafetyFailure (report ); err != nil {
375
+ s .log .Debug ("Failed to submit misbehavior report" , zap .Error (err ))
376
+ }
377
+ } else {
378
+ s .log .Debug ("Failed to create misbehavior report" , zap .Error (err ))
379
+ }
380
+
367
381
}
368
382
369
383
if env .OriginatorSequenceID () > lastSequenceID {
0 commit comments