diff --git a/docs/Stream-Protocol-Spec.md b/docs/Stream-Protocol-Spec.md index 4c4b59e20c..7b7ee3cc18 100644 --- a/docs/Stream-Protocol-Spec.md +++ b/docs/Stream-Protocol-Spec.md @@ -67,8 +67,8 @@ Wire Protocol Specifications | Msg Name | From->To | Params | Example | | -------- | -------- | -------- | ------- | -| StreamInfoReq | Client->Server | Streams`[]string` | `SYNC\|6, SYNC\|5` | -| StreamInfoRes | Server->Client | Streams`[]StreamDescriptor`
Stream`string`
Cursor`uint64`
Bounded`bool` | `SYNC\|6;CUR=1632;bounded, SYNC\|7;CUR=18433;bounded` | +| StreamInfoReq | Client->Server | Streams`[]ID` | `SYNC\|6, SYNC\|5` | +| StreamInfoRes | Server->Client | Streams`[]StreamDescriptor`
Stream`ID`
Cursor`uint64`
Bounded`bool` | `SYNC\|6;CUR=1632;bounded, SYNC\|7;CUR=18433;bounded` | | GetRange | Client->Server| Ruid`uint`
Stream `string`
From`uint`
To`*uint`(nullable)
Roundtrip`bool` | `Ruid: 21321, Stream: SYNC\|6, From: 1, To: 100`(bounded), Roundtrip: true
`Stream: SYNC\|7, From: 109, Roundtrip: true`(unbounded) | | OfferedHashes | Server->Client| Ruid`uint`
Hashes `[]byte` | `Ruid: 21321, Hashes: [cbcbbaddda, bcbbbdbbdc, ....]` | | WantedHashes | Client->Server | Ruid`uint`
Bitvector`[]byte` | `Ruid: 21321, Bitvector: [0100100100] ` | @@ -81,65 +81,120 @@ Notes: * two notions of bounded - on the stream level and on the localstore * if TO is not specified - we assume unbounded stream, and we just send whatever, until at most, we fill up an entire batch. -### Message struct definitions: +### Message and interface definitions: + + +```go +// StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable +// stream provider as part of the Stream! protocol specification. +type StreamProvider interface { + NeedData(ctx context.Context, key []byte) (need bool, wait func(context.Context) error) + Get(ctx context.Context, addr chunk.Address) ([]byte, error) + Put(ctx context.Context, addr chunk.Address, data []byte) (exists bool, err error) + Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func()) + Cursor(interface{}) (uint64, error) + RunUpdateStreams(p *Peer) + StreamName() string + ParseKey(string) (interface{}, error) + EncodeKey(interface{}) (string, error) + StreamBehavior() StreamInitBehavior + Boundedness() bool +} +``` + ```go +type StreamInitBehavior int +``` + +```go +// StreamInfoReq is a request to get information about particular streams type StreamInfoReq struct { - Streams []string + Streams []ID } ``` + ```go +// StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors type StreamInfoRes struct { - Streams []StreamDescriptor + Streams []StreamDescriptor } ``` + ```go +// StreamDescriptor describes an arbitrary stream type StreamDescriptor struct { - Name string - Cursor uint - Bounded bool + Stream ID + Cursor uint64 + Bounded bool } ``` + ```go +// GetRange is a message sent from the downstream peer to the upstream peer asking for chunks +// within a particular interval for a certain stream type GetRange struct { - Ruid uint - Stream string - From uint - To uint `rlp:nil` - BatchSize uint - Roundtrip bool + Ruid uint + Stream ID + From uint64 + To uint64 `rlp:nil` + BatchSize uint + Roundtrip bool } ``` + ```go +// OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter +// to selectively ask for chunks within a particular requested interval type OfferedHashes struct { - Ruid uint - LastIndex uint - Hashes []byte + Ruid uint + LastIndex uint + Hashes []byte } ``` + ```go +// WantedHashes is a message sent from the downstream peer to the upstream peer in response +// to OfferedHashes in order to selectively ask for a particular chunks within an interval type WantedHashes struct { - Ruid uint - BitVector []byte + Ruid uint + BitVector []byte } ``` + ```go +// ChunkDelivery delivers a frame of chunks in response to a WantedHashes message type ChunkDelivery struct { - Ruid uint - LastIndex uint - Chunks [][]byte + Ruid uint + LastIndex uint + Chunks []DeliveredChunk } ``` + ```go -type BatchDone struct { - Ruid uint - Last uint +// DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message +type DeliveredChunk struct { + Addr storage.Address + Data []byte } ``` + ```go +// StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state type StreamState struct { - Stream string - Code uint16 - Message string + Stream ID + Code uint16 + Message string +} +``` + +```go +// Stream defines a unique stream identifier in a textual representation +type ID struct { + // Name is used for the Stream provider identification + Name string + // Key is the name of specific data stream within the stream provider. The semantics of this value + // is at the discretion of the stream provider implementation + Key string } ``` @@ -147,14 +202,14 @@ Message exchange examples: ====== Initial handshake - client queries server for stream states
-![handshake](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-handshake.png) +![handshake](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-handshake.png)
GetRange (bounded) - client requests a bounded range within a stream
-![bounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-bounded.png) +![bounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-bounded.png)
GetRange (unbounded) - client requests an unbounded range (specifies only `From` parameter)
-![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-unbounded.png) +![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-unbounded.png)
GetRange (no roundtrip) - client requests an unbounded or bounded range with no roundtrip configured
-![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-no-roundtrip.png) +![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-no-roundtrip.png) diff --git a/network/newstream/common_test.go b/network/newstream/common_test.go new file mode 100644 index 0000000000..ab7eaecbbf --- /dev/null +++ b/network/newstream/common_test.go @@ -0,0 +1,65 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package newstream + +import ( + "flag" + "io/ioutil" + "os" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/storage/localstore" + "github.com/ethersphere/swarm/storage/mock" +) + +var ( + loglevel = flag.Int("loglevel", 5, "verbosity of logs") +) + +func init() { + flag.Parse() + + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) +} + +func newTestLocalStore(id enode.ID, addr *network.BzzAddr, globalStore mock.GlobalStorer) (localStore *localstore.DB, cleanup func(), err error) { + dir, err := ioutil.TempDir("", "swarm-stream-") + if err != nil { + return nil, nil, err + } + cleanup = func() { + os.RemoveAll(dir) + } + + var mockStore *mock.NodeStore + if globalStore != nil { + mockStore = globalStore.NewNodeStore(common.BytesToAddress(id.Bytes())) + } + + localStore, err = localstore.New(dir, addr.Over(), &localstore.Options{ + MockStore: mockStore, + }) + if err != nil { + cleanup() + return nil, nil, err + } + return localStore, cleanup, nil +} diff --git a/network/newstream/peer.go b/network/newstream/peer.go new file mode 100644 index 0000000000..dcbd9f5535 --- /dev/null +++ b/network/newstream/peer.go @@ -0,0 +1,100 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package newstream + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/network/bitvector" + "github.com/ethersphere/swarm/state" +) + +var ErrEmptyBatch = errors.New("empty batch") + +const ( + HashSize = 32 + BatchSize = 16 +) + +// Peer is the Peer extension for the streaming protocol +type Peer struct { + *network.BzzPeer + mtx sync.Mutex + providers map[string]StreamProvider + intervalsStore state.Store + + streamCursorsMu sync.Mutex + streamCursors map[string]uint64 // key: Stream ID string representation, value: session cursor. Keeps cursors for all streams. when unset - we are not interested in that bin + dirtyStreams map[string]bool // key: stream ID, value: whether cursors for a stream should be updated + activeBoundedGets map[string]chan struct{} + openWants map[uint]*want // maintain open wants on the client side + openOffers map[uint]offer // maintain open offers on the server side + quit chan struct{} // closed when peer is going offline +} + +// NewPeer is the constructor for Peer +func NewPeer(peer *network.BzzPeer, i state.Store, providers map[string]StreamProvider) *Peer { + p := &Peer{ + BzzPeer: peer, + providers: providers, + intervalsStore: i, + streamCursors: make(map[string]uint64), + dirtyStreams: make(map[string]bool), + openWants: make(map[uint]*want), + openOffers: make(map[uint]offer), + quit: make(chan struct{}), + } + return p +} +func (p *Peer) Left() { + close(p.quit) +} + +// HandleMsg is the message handler that delegates incoming messages +func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { + switch msg := msg.(type) { + default: + return fmt.Errorf("unknown message type: %T", msg) + } + return nil +} + +type offer struct { + ruid uint + stream ID + hashes []byte + requested time.Time +} + +type want struct { + ruid uint + from uint64 + to uint64 + stream ID + hashes map[string]bool + bv *bitvector.BitVector + requested time.Time + remaining uint64 + chunks chan chunk.Chunk + done chan error +} diff --git a/network/newstream/stream.go b/network/newstream/stream.go new file mode 100644 index 0000000000..77b1d8ca7d --- /dev/null +++ b/network/newstream/stream.go @@ -0,0 +1,167 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package newstream + +import ( + "sync" + + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/p2p/protocols" + "github.com/ethersphere/swarm/state" +) + +// SlipStream implements node.Service +var _ node.Service = (*SlipStream)(nil) + +var SyncerSpec = &protocols.Spec{ + Name: "bzz-stream", + Version: 8, + MaxMsgSize: 10 * 1024 * 1024, + Messages: []interface{}{ + StreamInfoReq{}, + StreamInfoRes{}, + GetRange{}, + OfferedHashes{}, + ChunkDelivery{}, + WantedHashes{}, + }, +} + +// SlipStream is the base type that handles all client/server operations on a node +// it is instantiated once per stream protocol instance, that is, it should have +// one instance per node +type SlipStream struct { + mtx sync.RWMutex + intervalsStore state.Store //every protocol would make use of this + peers map[enode.ID]*Peer + kad *network.Kademlia + + providers map[string]StreamProvider + + spec *protocols.Spec //this protocol's spec + balance protocols.Balance //implements protocols.Balance, for accounting + prices protocols.Prices //implements protocols.Prices, provides prices to accounting + + quit chan struct{} // terminates registry goroutines +} + +func NewSlipStream(intervalsStore state.Store, kad *network.Kademlia, providers ...StreamProvider) *SlipStream { + slipStream := &SlipStream{ + intervalsStore: intervalsStore, + kad: kad, + peers: make(map[enode.ID]*Peer), + providers: make(map[string]StreamProvider), + quit: make(chan struct{}), + } + + for _, p := range providers { + slipStream.providers[p.StreamName()] = p + } + + slipStream.spec = SyncerSpec + + return slipStream +} + +func (s *SlipStream) getPeer(id enode.ID) *Peer { + s.mtx.Lock() + defer s.mtx.Unlock() + p := s.peers[id] + return p +} + +func (s *SlipStream) addPeer(p *Peer) { + s.mtx.Lock() + defer s.mtx.Unlock() + s.peers[p.ID()] = p +} + +func (s *SlipStream) removePeer(p *Peer) { + s.mtx.Lock() + defer s.mtx.Unlock() + if _, found := s.peers[p.ID()]; found { + log.Error("removing peer", "id", p.ID()) + delete(s.peers, p.ID()) + p.Left() + } else { + log.Warn("peer was marked for removal but not found", "peer", p.ID()) + } +} + +// Run is being dispatched when 2 nodes connect +func (s *SlipStream) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := protocols.NewPeer(p, rw, s.spec) + bp := network.NewBzzPeer(peer) + + np := network.NewPeer(bp, s.kad) + s.kad.On(np) + defer s.kad.Off(np) + + sp := NewPeer(bp, s.intervalsStore, s.providers) + s.addPeer(sp) + defer s.removePeer(sp) + return peer.Run(sp.HandleMsg) +} + +func (s *SlipStream) Protocols() []p2p.Protocol { + return []p2p.Protocol{ + { + Name: "bzz-stream", + Version: 1, + Length: 10 * 1024 * 1024, + Run: s.Run, + }, + } +} + +func (s *SlipStream) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: "bzz-stream", + Version: "1.0", + Service: NewAPI(s), + Public: false, + }, + } +} + +// Additional public methods accessible through API for pss +type API struct { + *SlipStream +} + +func NewAPI(s *SlipStream) *API { + return &API{SlipStream: s} +} + +func (s *SlipStream) Start(server *p2p.Server) error { + log.Info("slip stream starting") + return nil +} + +func (s *SlipStream) Stop() error { + log.Info("slip stream closing") + s.mtx.Lock() + defer s.mtx.Unlock() + close(s.quit) + return nil +} diff --git a/network/newstream/wire.go b/network/newstream/wire.go new file mode 100644 index 0000000000..fb711b29f5 --- /dev/null +++ b/network/newstream/wire.go @@ -0,0 +1,174 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package newstream + +import ( + "context" + "fmt" + + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/storage" +) + +// StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable +// stream provider as part of the Stream! protocol specification. +// Since Stream! thoroughly defines the concepts of a stream, intervals, clients and servers, the +// interface therefore needs only a pluggable provider. +// The domain interpretable notions which are at the discretion of the implementing +// provider therefore are - sourcing data (get, put, subscribe for constant new data, and need data +// which is to decide whether to retrieve data or not), retrieving cursors from the data store, the +// implementation of which streams to maintain with a certain peer and providing functionality +// to expose, parse and encode values related to the string represntation of the stream +type StreamProvider interface { + + // NeedData informs the caller whether a certain chunk needs to be fetched from another peer or not. + // Typically this will involve checking whether a certain chunk exists locally. + // In case a chunk does not exist locally - a `wait` function returns upon chunk delivery + NeedData(ctx context.Context, key []byte) (need bool, wait func(context.Context) error) + + // Get a particular chunk identified by addr from the local storage + Get(ctx context.Context, addr chunk.Address) ([]byte, error) + + // Put a certain chunk into the local storage + Put(ctx context.Context, addr chunk.Address, data []byte) (exists bool, err error) + + // Subscribe to a data stream from an arbitrary data source + Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func()) + + // Cursor returns the last known Cursor for a given Stream Key + Cursor(interface{}) (uint64, error) + + // RunUpdateStreams is a provider specific implementation on how to maintain running streams with + // an arbitrary Peer. This method should always be run in a separate goroutine + RunUpdateStreams(p *Peer) + + // StreamName returns the Name of the Stream (see ID) + StreamName() string + + // ParseStream from a standard pipe-separated string and return the Stream Key + ParseKey(string) (interface{}, error) + + // EncodeStream from a Stream Key to a Stream pipe-separated string representation + EncodeKey(interface{}) (string, error) + + // StreamBehavior defines how the stream behaves upon initialisation + StreamBehavior() StreamInitBehavior + + Boundedness() bool +} + +// StreamInitBehavior defines the stream behavior upon init +type StreamInitBehavior int + +const ( + // StreamIdle means that there is no initial automatic message exchange + // between the nodes when the protocol gets established + StreamIdle StreamInitBehavior = iota + + // StreamGetCursors tells the two nodes to automatically fetch stream + // cursors from each other + StreamGetCursors + + // StreamAutostart automatically starts fetching data from the streams + // once the cursors arrive + StreamAutostart +) + +// StreamInfoReq is a request to get information about particular streams +type StreamInfoReq struct { + Streams []ID +} + +// StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors +type StreamInfoRes struct { + Streams []StreamDescriptor +} + +// StreamDescriptor describes an arbitrary stream +type StreamDescriptor struct { + Stream ID + Cursor uint64 + Bounded bool +} + +// GetRange is a message sent from the downstream peer to the upstream peer asking for chunks +// within a particular interval for a certain stream +type GetRange struct { + Ruid uint + Stream ID + From uint64 + To uint64 `rlp:nil` + BatchSize uint + Roundtrip bool +} + +// OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter +// to selectively ask for chunks within a particular requested interval +type OfferedHashes struct { + Ruid uint + LastIndex uint + Hashes []byte +} + +// WantedHashes is a message sent from the downstream peer to the upstream peer in response +// to OfferedHashes in order to selectively ask for a particular chunks within an interval +type WantedHashes struct { + Ruid uint + BitVector []byte +} + +// ChunkDelivery delivers a frame of chunks in response to a WantedHashes message +type ChunkDelivery struct { + Ruid uint + LastIndex uint + Chunks []DeliveredChunk +} + +// DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message +type DeliveredChunk struct { + Addr storage.Address //chunk address + Data []byte //chunk data +} + +// StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state +type StreamState struct { + Stream ID + Code uint16 + Message string +} + +// Stream defines a unique stream identifier in a textual representation +type ID struct { + // Name is used for the Stream provider identification + Name string + // Key is the name of specific data stream within the stream provider. The semantics of this value + // is at the discretion of the stream provider implementation + Key string +} + +// NewID returns a new Stream ID for a particular stream Name and Key +func NewID(name string, key string) ID { + return ID{ + Name: name, + Key: key, + } +} + +// String return a stream id based on all Stream fields. +func (s ID) String() string { + return fmt.Sprintf("%s|%s", s.Name, s.Key) +}