Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: Use atomic types #101

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) {
return nil, err
}
if respN.ID() != n.ID() {
return nil, fmt.Errorf("invalid ID in response record")
return nil, errors.New("invalid ID in response record")
}
if respN.Seq() < n.Seq() {
return n, nil // response record is older
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distances []uint, s
}
}
if _, ok := seen[node.ID()]; ok {
return nil, fmt.Errorf("duplicate record")
return nil, errors.New("duplicate record")
}
seen[node.ID()] = struct{}{}
return node, nil
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/v5wire/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,11 @@ func (c *Codec) makeHandshakeAuth(toID enode.ID, addr string, challenge *Whoarey
// key is part of the ID nonce signature.
var remotePubkey = new(ecdsa.PublicKey)
if err := challenge.Node.Load((*enode.Secp256k1)(remotePubkey)); err != nil {
return nil, nil, fmt.Errorf("can't find secp256k1 key for recipient")
return nil, nil, errors.New("can't find secp256k1 key for recipient")
}
ephkey, err := c.sc.ephemeralKeyGen()
if err != nil {
return nil, nil, fmt.Errorf("can't generate ephemeral key")
return nil, nil, errors.New("can't generate ephemeral key")
}
ephpubkey := EncodePubkey(&ephkey.PublicKey)
auth.pubkey = ephpubkey[:]
Expand Down
3 changes: 2 additions & 1 deletion p2p/dnsdisc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dnsdisc
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -194,7 +195,7 @@ func (c *Client) resolveEntry(ctx context.Context, domain, hash string) (entry,
func (c *Client) doResolveEntry(ctx context.Context, domain, hash string) (entry, error) {
wantHash, err := b32format.DecodeString(hash)
if err != nil {
return nil, fmt.Errorf("invalid base32 hash")
return nil, errors.New("invalid base32 hash")
}
name := hash + "." + domain
txts, err := c.cfg.Resolver.LookupTXT(ctx, hash+"."+domain)
Expand Down
4 changes: 2 additions & 2 deletions p2p/enode/idscheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package enode

import (
"crypto/ecdsa"
"fmt"
"errors"
"io"

"github.com/ethereum/go-ethereum/common/math"
Expand Down Expand Up @@ -66,7 +66,7 @@ func (V4ID) Verify(r *enr.Record, sig []byte) error {
if err := r.Load(&entry); err != nil {
return err
} else if len(entry) != 33 {
return fmt.Errorf("invalid public key")
return errors.New("invalid public key")
}

h := sha3.NewLegacyKeccak256()
Expand Down
11 changes: 5 additions & 6 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
var (
c1, c2 = make(chan Msg), make(chan Msg)
closing = make(chan struct{})
closed = new(int32)
closed = new(atomic.Bool)
rw1 = &MsgPipeRW{c1, c2, closing, closed}
rw2 = &MsgPipeRW{c2, c1, closing, closed}
)
Expand All @@ -178,13 +178,13 @@ type MsgPipeRW struct {
w chan<- Msg
r <-chan Msg
closing chan struct{}
closed *int32
closed *atomic.Bool
}

// WriteMsg sends a message on the pipe.
// It blocks until the receiver has consumed the message payload.
func (p *MsgPipeRW) WriteMsg(msg Msg) error {
if atomic.LoadInt32(p.closed) == 0 {
if !p.closed.Load() {
consumed := make(chan struct{}, 1)
msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed}
select {
Expand All @@ -205,7 +205,7 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error {

// ReadMsg returns a message sent on the other end of the pipe.
func (p *MsgPipeRW) ReadMsg() (Msg, error) {
if atomic.LoadInt32(p.closed) == 0 {
if !p.closed.Load() {
select {
case msg := <-p.r:
return msg, nil
Expand All @@ -219,9 +219,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) {
// of the pipe. They will return ErrPipeClosed. Close also
// interrupts any reads from a message payload.
func (p *MsgPipeRW) Close() error {
if atomic.AddInt32(p.closed, 1) != 1 {
if p.closed.Swap(true) {
// someone else is already closing
atomic.StoreInt32(p.closed, 1) // avoid overflow
return nil
}
close(p.closing)
Expand Down
3 changes: 2 additions & 1 deletion p2p/nat/natpmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package nat

import (
"errors"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -46,7 +47,7 @@ func (n *pmp) ExternalIP() (net.IP, error) {

func (n *pmp) AddMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error {
if lifetime <= 0 {
return fmt.Errorf("lifetime must not be <= 0")
return errors.New("lifetime must not be <= 0")
}
// Note order of port arguments is switched between our
// AddMapping and the client's AddPortMapping.
Expand Down
5 changes: 2 additions & 3 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"net"
"sort"
"sync"
Expand Down Expand Up @@ -913,13 +912,13 @@ func (srv *Server) checkInboundConn(remoteIP net.IP) error {
}
// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil && !srv.NetRestrict.Contains(remoteIP) {
return fmt.Errorf("not whitelisted in NetRestrict")
return errors.New("not whitelisted in NetRestrict")
}
// Reject Internet peers that try too often.
now := srv.clock.Now()
srv.inboundHistory.expire(now, nil)
if !netutil.IsLAN(remoteIP) && srv.inboundHistory.contains(remoteIP.String()) {
return fmt.Errorf("too many attempts")
return errors.New("too many attempts")
}
srv.inboundHistory.add(remoteIP.String(), now.Add(inboundThrottleTime))
return nil
Expand Down
2 changes: 1 addition & 1 deletion p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func startExecNodeStack() (*node.Node, error) {
// decode the config
confEnv := os.Getenv(envNodeConfig)
if confEnv == "" {
return nil, fmt.Errorf("missing " + envNodeConfig)
return nil, fmt.Errorf("missing %s", envNodeConfig)
}
var conf execNodeConfig
if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package p2p
import (
"bytes"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -152,7 +153,7 @@ func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) {
return nil, err
}
if msg.Size > baseProtocolMaxMsgSize {
return nil, fmt.Errorf("message too big")
return nil, errors.New("message too big")
}
if msg.Code == discMsg {
// Disconnect before protocol handshake is valid according to the
Expand Down
Loading