diff --git a/pkg/pushsync/metrics.go b/pkg/pushsync/metrics.go index e8fe628303b..52be63dfc9a 100644 --- a/pkg/pushsync/metrics.go +++ b/pkg/pushsync/metrics.go @@ -10,17 +10,19 @@ import ( ) type metrics struct { - TotalSent prometheus.Counter - TotalReceived prometheus.Counter - TotalErrors prometheus.Counter - TotalHandlerErrors prometheus.Counter - TotalReplicated prometheus.Counter - TotalReplicatedError prometheus.Counter - TotalSendAttempts prometheus.Counter - TotalFailedSendAttempts prometheus.Counter - TotalSkippedPeers prometheus.Counter - TotalOutgoing prometheus.Counter - TotalOutgoingErrors prometheus.Counter + TotalSent prometheus.Counter + TotalReceived prometheus.Counter + TotalErrors prometheus.Counter + TotalHandlerErrors prometheus.Counter + TotalReplicated prometheus.Counter + TotalReplicatedError prometheus.Counter + TotalSendAttempts prometheus.Counter + TotalFailedSendAttempts prometheus.Counter + TotalSkippedPeers prometheus.Counter + TotalOutgoing prometheus.Counter + TotalOutgoingErrors prometheus.Counter + InvalidStampErrors prometheus.Counter + TotalHandlerReplicationErrors prometheus.Counter } func newMetrics() metrics { @@ -94,6 +96,18 @@ func newMetrics() metrics { Name: "total_outgoing_errors", Help: "Total no of errors of entire operation to sync a chunk (multiple attempts included)", }), + InvalidStampErrors: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "invalid_stamps", + Help: "No of invalid stamp errors.", + }), + TotalHandlerReplicationErrors: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_replication_handlers_errors", + Help: "Total no of errors of pushsync handler neighborhood replication.", + }), } } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index c3f359955db..715bdc8e58d 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -164,21 +164,26 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) bytes := chunkAddress.Bytes() if dcmp, _ := swarm.DistanceCmp(bytes, p.Address.Bytes(), ps.address.Bytes()); dcmp == 1 { if ps.topologyDriver.IsWithinDepth(chunkAddress) { + ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor) defer canceld() chunk, err = ps.validStamp(chunk, ch.Stamp) if err != nil { + ps.metrics.InvalidStampErrors.Inc() + ps.metrics.TotalHandlerReplicationErrors.Inc() return fmt.Errorf("pushsync valid stamp: %w", err) } _, err = ps.storer.Put(ctxd, storage.ModePutSync, chunk) if err != nil { + ps.metrics.TotalHandlerReplicationErrors.Inc() return fmt.Errorf("chunk store: %w", err) } debit, err := ps.accounting.PrepareDebit(p.Address, price) if err != nil { + ps.metrics.TotalHandlerReplicationErrors.Inc() return fmt.Errorf("prepare debit to peer %s before writeback: %w", p.Address.String(), err) } defer debit.Cleanup() @@ -186,14 +191,20 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) // return back receipt signature, err := ps.signer.Sign(bytes) if err != nil { + ps.metrics.TotalHandlerReplicationErrors.Inc() return fmt.Errorf("receipt signature: %w", err) } receipt := pb.Receipt{Address: bytes, Signature: signature, BlockHash: ps.blockHash} if err := w.WriteMsgWithContext(ctxd, &receipt); err != nil { + ps.metrics.TotalHandlerReplicationErrors.Inc() return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err) } - return debit.Apply() + err = debit.Apply() + if err != nil { + ps.metrics.TotalHandlerReplicationErrors.Inc() + } + return err } return ErrOutOfDepthReplication @@ -206,6 +217,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) chunk, err = ps.validStamp(chunk, ch.Stamp) if err != nil { + ps.metrics.InvalidStampErrors.Inc() return fmt.Errorf("pushsync valid stamp: %w", err) } @@ -227,6 +239,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) chunk, err = ps.validStamp(chunk, ch.Stamp) if err != nil { + ps.metrics.InvalidStampErrors.Inc() return fmt.Errorf("pushsync valid stamp: %w", err) }