@@ -12,6 +12,7 @@ import (
12
12
"github.com/xmtp/xmtpd/pkg/db/queries"
13
13
envUtils "github.com/xmtp/xmtpd/pkg/envelopes"
14
14
clientInterceptors "github.com/xmtp/xmtpd/pkg/interceptors/client"
15
+ "github.com/xmtp/xmtpd/pkg/misbehavior"
15
16
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
16
17
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api"
17
18
"github.com/xmtp/xmtpd/pkg/registrant"
@@ -31,6 +32,7 @@ type syncWorker struct {
31
32
subscriptions map [uint32 ]struct {}
32
33
subscriptionsMutex sync.RWMutex
33
34
cancel context.CancelFunc
35
+ misbehaviorService misbehavior.MisbehaviorService
34
36
}
35
37
36
38
type originatorStream struct {
@@ -58,14 +60,15 @@ func startSyncWorker(
58
60
ctx , cancel := context .WithCancel (ctx )
59
61
60
62
s := & syncWorker {
61
- ctx : ctx ,
62
- log : log .Named ("syncWorker" ),
63
- nodeRegistry : nodeRegistry ,
64
- registrant : registrant ,
65
- store : store ,
66
- wg : sync.WaitGroup {},
67
- subscriptions : make (map [uint32 ]struct {}),
68
- cancel : cancel ,
63
+ ctx : ctx ,
64
+ log : log .Named ("syncWorker" ),
65
+ nodeRegistry : nodeRegistry ,
66
+ registrant : registrant ,
67
+ store : store ,
68
+ wg : sync.WaitGroup {},
69
+ subscriptions : make (map [uint32 ]struct {}),
70
+ cancel : cancel ,
71
+ misbehaviorService : misbehavior .NewLoggingMisbehaviorService (log ),
69
72
}
70
73
if err := s .start (); err != nil {
71
74
return nil , err
@@ -362,8 +365,10 @@ func (s *syncWorker) validateAndInsertEnvelope(
362
365
lastNs = stream .lastEnvelope .OriginatorNs ()
363
366
}
364
367
if env .OriginatorSequenceID () != lastSequenceID + 1 || env .OriginatorNs () < lastNs {
365
- // TODO(rich) Submit misbehavior report and continue
366
- s .log .Error ("Received out of order envelope" )
368
+ err = s .submitOutOfOrderReport (stream .nodeID , stream .lastEnvelope , env )
369
+ if err != nil {
370
+ s .log .Error ("Failed to submit out of order report" , zap .Error (err ))
371
+ }
367
372
}
368
373
369
374
if env .OriginatorSequenceID () > lastSequenceID {
@@ -375,6 +380,28 @@ func (s *syncWorker) validateAndInsertEnvelope(
375
380
s .insertEnvelope (env )
376
381
}
377
382
383
+ func (s * syncWorker ) submitOutOfOrderReport (
384
+ nodeID uint32 ,
385
+ lastEnvelope * envUtils.OriginatorEnvelope ,
386
+ currentEnvelope * envUtils.OriginatorEnvelope ,
387
+ ) error {
388
+ report , err := misbehavior .NewSafetyFailureReport (
389
+ nodeID ,
390
+ message_api .Misbehavior_MISBEHAVIOR_OUT_OF_ORDER ,
391
+ true ,
392
+ []* envUtils.OriginatorEnvelope {lastEnvelope , currentEnvelope },
393
+ )
394
+ if err != nil {
395
+ return err
396
+ }
397
+
398
+ err = s .misbehaviorService .SafetyFailure (report )
399
+ if err != nil {
400
+ return err
401
+ }
402
+ return nil
403
+ }
404
+
378
405
func (s * syncWorker ) insertEnvelope (env * envUtils.OriginatorEnvelope ) {
379
406
s .log .Debug ("Replication server received envelope" , zap .Any ("envelope" , env ))
380
407
originatorBytes , err := env .Bytes ()
0 commit comments