Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

network/newstream: new stream! protocol base implementation #1500

Merged
merged 85 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
4b615a5
network/syncer: initial commit
acud May 24, 2019
081fd9f
network/syncer: change receiver name
acud Jun 20, 2019
271a3b6
network/syncer: finalize bin index exchange test
acud Jun 20, 2019
089d73c
network/syncer: add logic to determin peer transitions within depth
acud Jun 21, 2019
30475f0
network/syncer: more tests on establishing the correct initial stream…
acud Jun 21, 2019
e6e1ce5
network/syncer: create test case for nodes moving inside network depth
acud Jun 21, 2019
c0c087f
fixed the node item tests
acud Jun 21, 2019
660635b
network/syncer: general test case
acud Jun 22, 2019
f31557c
network/syncer: address some pr comments
acud Jun 22, 2019
800f247
network/syncer: fix pivot test - cursors were not saved
acud Jun 22, 2019
0003cc0
network/syncer: cleanup
acud Jun 22, 2019
5554fac
network/syncer: change cursor test condition
acud Jun 22, 2019
93135bc
network/syncer: fix dynamic test
acud Jun 22, 2019
ce9b517
network/syncer: add exclusivity test to bin cursors, cleanup
acud Jun 22, 2019
1906319
network/syncer: test node moves out of depth
acud Jun 22, 2019
bee9574
network/syncer: add sync fetchers, remove unused vars
acud Jun 23, 2019
dac9397
network/syncer: simplify test
acud Jun 23, 2019
debc16b
network/syncer: check historical streams
acud Jun 23, 2019
7f97475
network/syncer: check historical streams
acud Jun 23, 2019
869796b
network/syncer: rename function
acud Jun 23, 2019
5a0044b
network/syncer: check historical streams
acud Jun 23, 2019
ba548ac
network/syncer: assert streams removed on peer moved out of depth
acud Jun 23, 2019
3838de5
network/syncer: delete after channel close
acud Jun 23, 2019
966b2fe
network/syncer: add test to cover node moves into depth after leaving it
acud Jun 24, 2019
3889233
network/syncer: change syncBins to accomodate for both conventional p…
acud Jun 24, 2019
021759e
network/syncer: eliminate shadowing
acud Jun 25, 2019
6c272ba
wip try to find why this is not passing
acud Jun 25, 2019
b6d513c
network/syncer: TestNodeRemovesAndReestablishCurosrs passes
acud Jun 26, 2019
f600449
network/syncer: removed check that fails sporadically
acud Jun 26, 2019
4ee79de
network/syncer: cleanup
acud Jun 26, 2019
bd27ba4
network/syncer: add syncing test from stream pkg
acud Jun 26, 2019
c52f631
network/syncer: wip full syncing test
acud Jun 26, 2019
375100a
network/syncer: cleanup
acud Jun 26, 2019
56fe4e3
network/syncer: upload content to one node. test now expectedly fails
acud Jun 26, 2019
9a4e321
network/syncer: push data to one node then add the second one in orde…
acud Jun 26, 2019
516343b
network/syncer: add syncer method stubs
acud Jun 27, 2019
85179b6
network/syncer: reshuffle
acud Jun 27, 2019
b04e261
network/syncer: reinstate lock
acud Jun 27, 2019
0c394d8
network/syncer: test fails as wanted
acud Jun 27, 2019
61e4e7f
network/syncer: wip add roundtrip handling
acud Jun 27, 2019
f327559
network/syncer: just test for full sync at this point
acud Jun 27, 2019
fb228c2
network/syncer: wip chunk delivery
acud Jun 27, 2019
6a8faba
network/syncer: wip add intervals persistence from stream pkg, wip ba…
acud Jun 27, 2019
4be53a4
network/syncer: wip add intervals persistence from stream pkg, wip ba…
acud Jun 27, 2019
c522e90
network/syncer: wip chunk delivery
acud Jun 27, 2019
60a12e7
network/syncer: store some chunks
acud Jun 28, 2019
f73813e
network/syncer: historical syncing works
acud Jun 28, 2019
e556bbb
network/syncer: fix a few holes due to introduced concurrency
acud Jun 28, 2019
84a7e94
network/stream/intervals: fix commit
acud Jun 28, 2019
ce1bc0c
network/syncer: import stream ID from stream pkg
acud Jul 2, 2019
fba0520
network/syncer: wip abstract StreamProvider
acud Jul 2, 2019
d387680
network/stream/intervals: add ceiling argument to the Next method
janos Jul 3, 2019
e78b364
network/syncer: wip stream provider abstraction
acud Jul 3, 2019
509cb36
network/syncer: wip abstraction
acud Jul 3, 2019
6a11e30
network/syncer: rename sync provider file
acud Jul 3, 2019
97d5592
network/syncer: make new abstraction compile
acud Jul 3, 2019
18780ab
network/syncer: tests run but still not green
acud Jul 3, 2019
5f71255
network/syncer: cursors tests green when disbaling auto sync
acud Jul 3, 2019
d3da784
network/syncer: wip make sync test pass
acud Jul 3, 2019
ee43872
network/syncer: TestNodesExchangeCorrectBinIndexes green
acud Jul 4, 2019
179110f
network/syncer: wip make tests pass
acud Jul 4, 2019
35ed804
network/syncer: TestNodesExchangeCorrectBinIndexesInPivot green
acud Jul 4, 2019
8c689b2
network/syncer: introduce stream init behavior
acud Jul 4, 2019
aaf81bd
network/syncer: synchronise cursor delete
acud Jul 4, 2019
912a7f0
network/syncer: wip make tests pass
acud Jul 4, 2019
9d72a98
network/syncer: wip make tests pass
acud Jul 4, 2019
a50504b
network/syncer: test full sync mostly passes
acud Jul 4, 2019
4d6895a
network/stream/intervals: indicate if the Next range is empty
janos Jul 4, 2019
58343dc
network/syncer: cleanup
acud Jul 4, 2019
e1e9aeb
network/syncer: cleanup
acud Jul 4, 2019
5a282c7
network/syncer: fix TestNodeRemovesAndReestablishCursors peers map ac…
janos Jul 4, 2019
b93fa24
network/syncer: fix TestNodesExchangeCorrectBinIndexes data races
janos Jul 4, 2019
9c6b173
network/syncer: fix data races
janos Jul 4, 2019
7612f1b
network/syncer: remove panics, better tracing, set chunk as Set on Get
acud Jul 5, 2019
d11e669
network/syncer: fix panic on close of closed channel, cleanup
acud Jul 5, 2019
eaac56a
network: rename syncer->newstream
acud Jul 5, 2019
5b46ad3
network/newstream: rename package, improve docs
acud Jul 5, 2019
1c3fd87
network/newstream: rename peer_test.go
acud Jul 5, 2019
5a408cd
network/newstream: clean out spec for initial merge - just wire and b…
acud Jul 5, 2019
c8603c8
docs: propagate changes to wire protocol to the docs
acud Jul 5, 2019
1e7946e
network/bitvector: remove panic
acud Jul 5, 2019
b128333
network, storage: clean diff for initial pr
acud Jul 5, 2019
df8d517
vendor: clean diff
acud Jul 5, 2019
c99b3bc
pss: clean diff
acud Jul 5, 2019
6986049
network/newstream: fix linter
acud Jul 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 87 additions & 32 deletions docs/Stream-Protocol-Spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ Wire Protocol Specifications

| Msg Name | From->To | Params | Example |
| -------- | -------- | -------- | ------- |
| StreamInfoReq | Client->Server | Streams`[]string` | `SYNC\|6, SYNC\|5` |
| StreamInfoRes | Server->Client | Streams`[]StreamDescriptor` <br>Stream`string`<br>Cursor`uint64`<br>Bounded`bool` | `SYNC\|6;CUR=1632;bounded, SYNC\|7;CUR=18433;bounded` |
| StreamInfoReq | Client->Server | Streams`[]ID` | `SYNC\|6, SYNC\|5` |
| StreamInfoRes | Server->Client | Streams`[]StreamDescriptor` <br>Stream`ID`<br>Cursor`uint64`<br>Bounded`bool` | `SYNC\|6;CUR=1632;bounded, SYNC\|7;CUR=18433;bounded` |
| GetRange | Client->Server| Ruid`uint`<br>Stream `string`<br>From`uint`<br>To`*uint`(nullable)<br>Roundtrip`bool` | `Ruid: 21321, Stream: SYNC\|6, From: 1, To: 100`(bounded), Roundtrip: true<br>`Stream: SYNC\|7, From: 109, Roundtrip: true`(unbounded) |
| OfferedHashes | Server->Client| Ruid`uint`<br>Hashes `[]byte` | `Ruid: 21321, Hashes: [cbcbbaddda, bcbbbdbbdc, ....]` |
| WantedHashes | Client->Server | Ruid`uint`<br>Bitvector`[]byte` | `Ruid: 21321, Bitvector: [0100100100] ` |
Expand All @@ -81,80 +81,135 @@ Notes:
* two notions of bounded - on the stream level and on the localstore
* if TO is not specified - we assume unbounded stream, and we just send whatever, until at most, we fill up an entire batch.

### Message struct definitions:
### Message and interface definitions:


```go
// StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable
// stream provider as part of the Stream! protocol specification.
type StreamProvider interface {
NeedData(ctx context.Context, key []byte) (need bool, wait func(context.Context) error)
Get(ctx context.Context, addr chunk.Address) ([]byte, error)
Put(ctx context.Context, addr chunk.Address, data []byte) (exists bool, err error)
Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func())
Cursor(interface{}) (uint64, error)
RunUpdateStreams(p *Peer)
StreamName() string
ParseKey(string) (interface{}, error)
EncodeKey(interface{}) (string, error)
StreamBehavior() StreamInitBehavior
Boundedness() bool
}
```

```go
type StreamInitBehavior int
```

```go
// StreamInfoReq is a request to get information about particular streams
type StreamInfoReq struct {
Streams []string
Streams []ID
}
```

```go
// StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors
type StreamInfoRes struct {
Streams []StreamDescriptor
Streams []StreamDescriptor
}
```

```go
// StreamDescriptor describes an arbitrary stream
type StreamDescriptor struct {
Name string
Cursor uint
Bounded bool
Stream ID
Cursor uint64
Bounded bool
}
```

```go
// GetRange is a message sent from the downstream peer to the upstream peer asking for chunks
// within a particular interval for a certain stream
type GetRange struct {
Ruid uint
Stream string
From uint
To uint `rlp:nil`
BatchSize uint
Roundtrip bool
Ruid uint
Stream ID
From uint64
To uint64 `rlp:nil`
BatchSize uint
Roundtrip bool
}
```

```go
// OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter
// to selectively ask for chunks within a particular requested interval
type OfferedHashes struct {
Ruid uint
LastIndex uint
Hashes []byte
Ruid uint
LastIndex uint
Hashes []byte
}
```

```go
// WantedHashes is a message sent from the downstream peer to the upstream peer in response
// to OfferedHashes in order to selectively ask for a particular chunks within an interval
type WantedHashes struct {
Ruid uint
BitVector []byte
Ruid uint
BitVector []byte
}
```

```go
// ChunkDelivery delivers a frame of chunks in response to a WantedHashes message
type ChunkDelivery struct {
Ruid uint
LastIndex uint
Chunks [][]byte
Ruid uint
LastIndex uint
Chunks []DeliveredChunk
}
```

```go
type BatchDone struct {
Ruid uint
Last uint
// DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message
type DeliveredChunk struct {
Addr storage.Address
Data []byte
}
```

```go
// StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state
type StreamState struct {
Stream string
Code uint16
Message string
Stream ID
Code uint16
Message string
}
```

```go
// Stream defines a unique stream identifier in a textual representation
type ID struct {
// Name is used for the Stream provider identification
Name string
// Key is the name of specific data stream within the stream provider. The semantics of this value
// is at the discretion of the stream provider implementation
Key string
}
```

Message exchange examples:
======

Initial handshake - client queries server for stream states<br>
![handshake](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-handshake.png)
![handshake](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-handshake.png)
<br>
GetRange (bounded) - client requests a bounded range within a stream<br>
![bounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-bounded.png)
![bounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-bounded.png)
<br>
GetRange (unbounded) - client requests an unbounded range (specifies only `From` parameter)<br>
![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-unbounded.png)
![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-unbounded.png)
<br>
GetRange (no roundtrip) - client requests an unbounded or bounded range with no roundtrip configured<br>
![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/stream-spec/docs/diagrams/stream-no-roundtrip.png)
![unbounded-range](https://raw.githubusercontent.com/ethersphere/swarm/master/docs/diagrams/stream-no-roundtrip.png)

65 changes: 65 additions & 0 deletions network/newstream/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2019 The Swarm Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate common_test file?

// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.

package newstream

import (
"flag"
"io/ioutil"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/storage/localstore"
"github.com/ethersphere/swarm/storage/mock"
)

var (
loglevel = flag.Int("loglevel", 5, "verbosity of logs")
)

func init() {
flag.Parse()

log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
}

func newTestLocalStore(id enode.ID, addr *network.BzzAddr, globalStore mock.GlobalStorer) (localStore *localstore.DB, cleanup func(), err error) {
dir, err := ioutil.TempDir("", "swarm-stream-")
if err != nil {
return nil, nil, err
}
cleanup = func() {
os.RemoveAll(dir)
}

var mockStore *mock.NodeStore
if globalStore != nil {
mockStore = globalStore.NewNodeStore(common.BytesToAddress(id.Bytes()))
}

localStore, err = localstore.New(dir, addr.Over(), &localstore.Options{
MockStore: mockStore,
})
if err != nil {
cleanup()
return nil, nil, err
}
return localStore, cleanup, nil
}
100 changes: 100 additions & 0 deletions network/newstream/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.

package newstream

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/network/bitvector"
"github.com/ethersphere/swarm/state"
)

var ErrEmptyBatch = errors.New("empty batch")

const (
HashSize = 32
BatchSize = 16
)

// Peer is the Peer extension for the streaming protocol
type Peer struct {
*network.BzzPeer
mtx sync.Mutex
providers map[string]StreamProvider
intervalsStore state.Store

streamCursorsMu sync.Mutex
streamCursors map[string]uint64 // key: Stream ID string representation, value: session cursor. Keeps cursors for all streams. when unset - we are not interested in that bin
dirtyStreams map[string]bool // key: stream ID, value: whether cursors for a stream should be updated
activeBoundedGets map[string]chan struct{}
openWants map[uint]*want // maintain open wants on the client side
openOffers map[uint]offer // maintain open offers on the server side
quit chan struct{} // closed when peer is going offline
}

// NewPeer is the constructor for Peer
func NewPeer(peer *network.BzzPeer, i state.Store, providers map[string]StreamProvider) *Peer {
p := &Peer{
BzzPeer: peer,
providers: providers,
intervalsStore: i,
streamCursors: make(map[string]uint64),
dirtyStreams: make(map[string]bool),
openWants: make(map[uint]*want),
openOffers: make(map[uint]offer),
quit: make(chan struct{}),
}
return p
}
func (p *Peer) Left() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used at one place. Why not directly inline close(p.quit) there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

close(p.quit)
}

// HandleMsg is the message handler that delegates incoming messages
func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
default:
return fmt.Errorf("unknown message type: %T", msg)
}
return nil
}

type offer struct {
ruid uint
stream ID
hashes []byte
requested time.Time
}

type want struct {
ruid uint
from uint64
to uint64
stream ID
hashes map[string]bool
bv *bitvector.BitVector
requested time.Time
remaining uint64
chunks chan chunk.Chunk
done chan error
}
Loading