Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
pushsync: initial implementation (#1782)
Browse files Browse the repository at this point in the history
  • Loading branch information
janos authored Sep 25, 2019
1 parent 1b5c77c commit d29dfb1
Show file tree
Hide file tree
Showing 48 changed files with 2,044 additions and 152 deletions.
14 changes: 7 additions & 7 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestApiPut(t *testing.T) {
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
tag := tags.All()[0]
chunktesting.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
chunktesting.CheckTag(t, tag, 2, 2, 0, 0, 0, 2) //1 chunk data, 1 chunk manifest
})
}

Expand All @@ -168,11 +168,11 @@ func TestApiTagLarge(t *testing.T) {
if toEncrypt {
tag := tags.All()[0]
expect := int64(4095 + 64 + 1)
chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
} else {
tag := tags.All()[0]
expect := int64(4095 + 32 + 1)
chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
}
})
}
Expand Down Expand Up @@ -551,16 +551,16 @@ func putString(ctx context.Context, a *API, content string, contentType string,
r := strings.NewReader(content)
tag, err := a.Tags.Create("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)
log.Trace("created new tag", "id", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
ctx = sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
key, waitManifest, err := a.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestClientUploadDownloadRaw(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}

func TestClientUploadDownloadRawEncrypted(t *testing.T) {
Expand All @@ -69,7 +69,7 @@ func TestClientUploadDownloadRawEncrypted(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}

func testClientUploadDownloadRaw(srv *swarmhttp.TestSwarmServer, toEncrypt bool, t *testing.T, data []byte, toPin bool) string {
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
chunktesting.CheckTag(t, tag, 9, 9, 0, 0, 0, 9)

// check we can download the individual files
checkDownloadFile := func(path string, expected []byte) {
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestClientMultipartUpload(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
chunktesting.CheckTag(t, tag, 9, 9, 0, 0, 0, 9)

// check we can download the individual files
checkDownloadFile := func(path string) {
Expand Down
2 changes: 2 additions & 0 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Config struct {
Enode *enode.Node `toml:"-"`
NetworkID uint64
SyncEnabled bool
PushSyncEnabled bool
SyncingSkipCheck bool
DeliverySkipCheck bool
MaxStreamPeerServers int
Expand Down Expand Up @@ -105,6 +106,7 @@ func NewConfig() *Config {
Port: DefaultHTTPPort,
NetworkID: network.DefaultNetworkID,
SyncEnabled: true,
PushSyncEnabled: false,
SyncingSkipCheck: false,
DeliverySkipCheck: true,
MaxStreamPeerServers: 10000,
Expand Down
28 changes: 20 additions & 8 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/sctx"
"github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/pin"
Expand Down Expand Up @@ -282,10 +283,10 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
ruid := GetRUID(r.Context())
log.Debug("handle.post.raw", "ruid", ruid)

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)
tagUID := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUID)
if err != nil {
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}

postRawCount.Inc(1)
Expand Down Expand Up @@ -340,7 +341,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
}

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, addr)
}
Expand All @@ -355,6 +356,17 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("handle.post.files", "ruid", ruid)
postFilesCount.Inc(1)

tagUID := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUID)
if err != nil {
log.Error("handle post raw got an error retrieving tag", "tagUID", tagUID, "err", err)
}

// start an http.post span to measure how long the HTTP POST request took, and link it with the tag.Context()
// N.B. this is independent context (used for tracing), to the HTTP request context - r.Context()
_, sp := spancontext.StartSpan(tag.Context(), "http.post")
defer sp.Finish()

contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
postFilesFail.Inc(1)
Expand Down Expand Up @@ -411,10 +423,10 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
return
}

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)
tagUID = sctx.GetTag(r.Context())
tag, err = s.api.Tags.Get(tagUID)
if err != nil {
log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
log.Error("got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}

log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.TotalCounter())
Expand All @@ -433,7 +445,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("stored content", "ruid", ruid, "key", newAddr)

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, newAddr)
}
Expand Down
4 changes: 2 additions & 2 deletions api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func testBzzTar(encrypted bool, t *testing.T) {

// check that the tag was written correctly
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 4, 4, 0, 4)
chunktesting.CheckTag(t, tag, 4, 4, 0, 0, 0, 4)

swarmHash, err := ioutil.ReadAll(resp2.Body)
resp2.Body.Close()
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func TestBzzCorrectTagEstimate(t *testing.T) {
<-time.After(10 * time.Millisecond)
case 1:
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 0, 0, 0, v.expChunks)
chunktesting.CheckTag(t, tag, 0, 0, 0, 0, 0, v.expChunks)
srv.Tags.Delete(tag.Uid)
done = true
}
Expand Down
15 changes: 15 additions & 0 deletions api/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
stream "github.com/ethersphere/swarm/network/stream/v2"
Expand Down Expand Up @@ -53,6 +54,20 @@ func (i *Inspector) KademliaInfo() network.KademliaInfo {
return i.hive.KademliaInfo()
}

func (i *Inspector) IsPushSynced(tagname string) bool {
tags := i.api.Tags.All()

for _, t := range tags {
if t.Name == tagname {
ds := t.Done(chunk.StateSynced)
log.Trace("found tag", "tagname", tagname, "done-syncing", ds)
return ds
}
}

return false
}

func (i *Inspector) IsPullSyncing() bool {
t := i.stream.LastReceivedChunkTime()

Expand Down
12 changes: 12 additions & 0 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type Chunk interface {
Data() []byte
PinCounter() uint64
WithPinCounter(p uint64) Chunk
TagID() uint32
WithTagID(t uint32) Chunk
}

type chunk struct {
addr Address
sdata []byte
pinCounter uint64
tagID uint32
}

func NewChunk(addr Address, data []byte) Chunk {
Expand All @@ -60,6 +63,11 @@ func (c *chunk) WithPinCounter(p uint64) Chunk {
return c
}

func (c *chunk) WithTagID(t uint32) Chunk {
c.tagID = t
return c
}

func (c *chunk) Address() Address {
return c.addr
}
Expand All @@ -72,6 +80,10 @@ func (c *chunk) PinCounter() uint64 {
return c.pinCounter
}

func (c *chunk) TagID() uint32 {
return c.tagID
}

func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
Expand Down
66 changes: 58 additions & 8 deletions chunk/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package chunk

import (
"context"
"encoding/binary"
"errors"
"sync/atomic"
"time"

"github.com/ethersphere/swarm/spancontext"
"github.com/opentracing/opentracing-go"
)

var (
Expand Down Expand Up @@ -53,22 +57,39 @@ type Tag struct {
Sent int64 // number of chunks sent for push syncing
Synced int64 // number of chunks synced with proof
StartedAt time.Time // tag started to calculate ETA

// end-to-end tag tracing
ctx context.Context // tracing context
span opentracing.Span // tracing root span
}

// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
// NewTag creates a new tag, and returns it
func NewTag(uid uint32, s string, total int64) *Tag {
t := &Tag{
Uid: uid,
Name: s,
StartedAt: time.Now(),
Total: total,
}

// context here is used only to store the root span `new.upload.tag` within Tag,
// we don't need any type of ctx Deadline or cancellation for this particular ctx
t.ctx, t.span = spancontext.StartSpan(context.Background(), "new.upload.tag")
return t
}

// Inc increments the count for a state
func (t *Tag) Inc(state State) {
// Context accessor
func (t *Tag) Context() context.Context {
return t.ctx
}

// FinishRootSpan closes the pushsync span of the tags
func (t *Tag) FinishRootSpan() {
t.span.Finish()
}

// IncN increments the count for a state
func (t *Tag) IncN(state State, n int) {
var v *int64
switch state {
case StateSplit:
Expand All @@ -82,7 +103,12 @@ func (t *Tag) Inc(state State) {
case StateSynced:
v = &t.Synced
}
atomic.AddInt64(v, 1)
atomic.AddInt64(v, int64(n))
}

// Inc increments the count for a state
func (t *Tag) Inc(state State) {
t.IncN(state, 1)
}

// Get returns the count for a state on a tag
Expand All @@ -108,6 +134,32 @@ func (t *Tag) TotalCounter() int64 {
return atomic.LoadInt64(&t.Total)
}

// WaitTillDone returns without error once the tag is complete
// wrt the state given as argument
// it returns an error if the context is done
func (t *Tag) WaitTillDone(ctx context.Context, s State) error {
if t.Done(s) {
return nil
}
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if t.Done(s) {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// Done returns true if tag is complete wrt the state given as argument
func (t *Tag) Done(s State) bool {
n, total, err := t.Status(s)
return err == nil && n == total
}

// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
// is meant to be called when splitter finishes for input streams of unknown size
func (t *Tag) DoneSplit(address Address) int64 {
Expand Down Expand Up @@ -168,9 +220,7 @@ func (tag *Tag) MarshalBinary() (data []byte, err error) {

n = binary.PutVarint(intBuffer, int64(len(tag.Address)))
buffer = append(buffer, intBuffer[:n]...)

buffer = append(buffer, tag.Address...)

buffer = append(buffer, tag.Address[:]...)
buffer = append(buffer, []byte(tag.Name)...)

return buffer, nil
Expand Down
Loading

0 comments on commit d29dfb1

Please sign in to comment.