-
Notifications
You must be signed in to change notification settings - Fork 113
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(dot/network): added common request-response protocol (#3334)
So far, we were using only one request-response protocol (block request and block response). But now, we need to add quite a few more. Just like we did for notification protocol, we need some common code-infrastructure for adding new request-response protocol. This PR introduces interfaces for Request and Response to follow and common functions to make request and decode response.
- Loading branch information
1 parent
457fe39
commit ac52090
Showing
19 changed files
with
244 additions
and
143 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
// Copyright 2021 ChainSafe Systems (ON) | ||
// SPDX-License-Identifier: LGPL-3.0-only | ||
|
||
package network | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ChainSafe/gossamer/dot/peerset" | ||
libp2pnetwork "github.com/libp2p/go-libp2p/core/network" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
"github.com/libp2p/go-libp2p/core/protocol" | ||
) | ||
|
||
type RequestMaker interface { | ||
Do(to peer.ID, req Message, res ResponseMessage) error | ||
} | ||
|
||
type RequestResponseProtocol struct { | ||
ctx context.Context | ||
host *host | ||
requestTimeout time.Duration | ||
maxResponseSize uint64 | ||
protocolID protocol.ID | ||
responseBufMu sync.Mutex | ||
responseBuf []byte | ||
} | ||
|
||
func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMessage) error { | ||
rrp.host.p2pHost.ConnManager().Protect(to, "") | ||
defer rrp.host.p2pHost.ConnManager().Unprotect(to, "") | ||
|
||
ctx, cancel := context.WithTimeout(rrp.ctx, rrp.requestTimeout) | ||
defer cancel() | ||
|
||
stream, err := rrp.host.p2pHost.NewStream(ctx, to, rrp.protocolID) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
defer func() { | ||
err := stream.Close() | ||
if err != nil { | ||
logger.Warnf("failed to close stream: %s", err) | ||
} | ||
}() | ||
|
||
if err = rrp.host.writeToStream(stream, req); err != nil { | ||
return err | ||
} | ||
|
||
return rrp.receiveResponse(stream, res) | ||
} | ||
|
||
func (rrp *RequestResponseProtocol) receiveResponse(stream libp2pnetwork.Stream, msg ResponseMessage) error { | ||
// allocating a new (large) buffer every time slows down receiving response by a dramatic amount, | ||
// as malloc is one of the most CPU intensive tasks. | ||
// thus we should allocate buffers at startup and re-use them instead of allocating new ones each time. | ||
rrp.responseBufMu.Lock() | ||
defer rrp.responseBufMu.Unlock() | ||
|
||
buf := rrp.responseBuf | ||
|
||
n, err := readStream(stream, &buf, rrp.maxResponseSize) | ||
if err != nil { | ||
return fmt.Errorf("read stream error: %w", err) | ||
} | ||
|
||
if n == 0 { | ||
return fmt.Errorf("received empty message") | ||
} | ||
|
||
err = msg.Decode(buf[:n]) | ||
if err != nil { | ||
rrp.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ | ||
Value: peerset.BadMessageValue, | ||
Reason: peerset.BadMessageReason, | ||
}, stream.Conn().RemotePeer()) | ||
return fmt.Errorf("failed to decode block response: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type ResponseMessage interface { | ||
String() string | ||
Encode() ([]byte, error) | ||
Decode(in []byte) (err error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.