Skip to content

Commit

Permalink
cmd/swarm/swarm-smoke: refactor generateEndpoints (#19006)
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense authored Feb 7, 2019
1 parent 7f55b0c commit d212535
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 191 deletions.
67 changes: 40 additions & 27 deletions cmd/swarm/swarm-smoke/feed_upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"bytes"
"crypto/md5"
crand "crypto/rand"
"fmt"
"io"
"io/ioutil"
Expand All @@ -16,7 +15,9 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/testutil"
"github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1"
)
Expand All @@ -25,13 +26,28 @@ const (
feedRandomDataLength = 8
)

// TODO: retrieve with manifest + extract repeating code
func feedUploadAndSync(c *cli.Context) error {
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now())
func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error {
errc := make(chan error)

generateEndpoints(scheme, cluster, appName, from, to)
go func() {
errc <- feedUploadAndSync(ctx, tuid)
}()

log.Info("generating and uploading feeds to " + endpoints[0] + " and syncing")
select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)

return fmt.Errorf("timeout after %v sec", timeout)
}
}

func feedUploadAndSync(c *cli.Context, tuid string) error {
log.Info("generating and uploading feeds to " + httpEndpoint(hosts[0]) + " and syncing")

// create a random private key to sign updates with and derive the address
pkFile, err := ioutil.TempFile("", "swarm-feed-smoke-test")
Expand Down Expand Up @@ -85,7 +101,7 @@ func feedUploadAndSync(c *cli.Context) error {

// create feed manifest, topic only
var out bytes.Buffer
cmd := exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--user", userHex)
cmd := exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--user", userHex)
cmd.Stdout = &out
log.Debug("create feed manifest topic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -100,7 +116,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// create feed manifest, subtopic only
cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--name", subTopicHex, "--user", userHex)
cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--name", subTopicHex, "--user", userHex)
cmd.Stdout = &out
log.Debug("create feed manifest subtopic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -115,7 +131,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// create feed manifest, merged topic
cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex)
cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex)
cmd.Stdout = &out
log.Debug("create feed manifest mergetopic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -141,7 +157,7 @@ func feedUploadAndSync(c *cli.Context) error {
dataHex := hexutil.Encode(data)

// update with topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, dataHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, dataHex)
cmd.Stdout = &out
log.Debug("update feed manifest topic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -152,7 +168,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// update with subtopic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, dataHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, dataHex)
cmd.Stdout = &out
log.Debug("update feed manifest subtopic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -163,7 +179,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// update with merged topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex)
cmd.Stdout = &out
log.Debug("update feed manifest merged topic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -177,36 +193,33 @@ func feedUploadAndSync(c *cli.Context) error {

// retrieve the data
wg := sync.WaitGroup{}
for _, endpoint := range endpoints {
for _, host := range hosts {
// raw retrieve, topic only
for _, hex := range []string{topicHex, subTopicOnlyHex, mergedSubTopicHex} {
wg.Add(1)
ruid := uuid.New()[:8]
go func(hex string, endpoint string, ruid string) {
for {
err := fetchFeed(hex, userHex, endpoint, dataHash, ruid)
err := fetchFeed(hex, userHex, httpEndpoint(host), dataHash, ruid)
if err != nil {
continue
}

wg.Done()
return
}
}(hex, endpoint, ruid)

}(hex, httpEndpoint(host), ruid)
}
}
wg.Wait()
log.Info("all endpoints synced random data successfully")

// upload test file
seed := int(time.Now().UnixNano() / 1e6)
log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed)
log.Info("feed uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)

h = md5.New()
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)
randomBytes := testutil.RandomBytes(seed, filesize*1000)

hash, err := upload(r, filesize*1000, endpoints[0])
hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
if err != nil {
return err
}
Expand All @@ -220,7 +233,7 @@ func feedUploadAndSync(c *cli.Context) error {
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash))

// update file with topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, multihashHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, multihashHex)
cmd.Stdout = &out
err = cmd.Run()
if err != nil {
Expand All @@ -230,7 +243,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// update file with subtopic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, multihashHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, multihashHex)
cmd.Stdout = &out
err = cmd.Run()
if err != nil {
Expand All @@ -240,7 +253,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// update file with merged topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex)
cmd.Stdout = &out
err = cmd.Run()
if err != nil {
Expand All @@ -251,23 +264,23 @@ func feedUploadAndSync(c *cli.Context) error {

time.Sleep(3 * time.Second)

for _, endpoint := range endpoints {
for _, host := range hosts {

// manifest retrieve, topic only
for _, url := range []string{manifestWithTopic, manifestWithSubTopic, manifestWithMergedTopic} {
wg.Add(1)
ruid := uuid.New()[:8]
go func(url string, endpoint string, ruid string) {
for {
err := fetch(url, endpoint, fileHash, ruid)
err := fetch(url, endpoint, fileHash, ruid, "")
if err != nil {
continue
}

wg.Done()
return
}
}(url, endpoint, ruid)
}(url, httpEndpoint(host), ruid)
}

}
Expand Down
70 changes: 25 additions & 45 deletions cmd/swarm/swarm-smoke/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,15 @@ var (
)

var (
endpoints []string
includeLocalhost bool
cluster string
appName string
scheme string
filesize int
syncDelay int
from int
to int
verbosity int
timeout int
single bool
allhosts string
hosts []string
filesize int
syncDelay int
httpPort int
wsPort int
verbosity int
timeout int
single bool
)

func main() {
Expand All @@ -59,39 +56,22 @@ func main() {

app.Flags = []cli.Flag{
cli.StringFlag{
Name: "cluster-endpoint",
Value: "prod",
Usage: "cluster to point to (prod or a given namespace)",
Destination: &cluster,
},
cli.StringFlag{
Name: "app",
Value: "swarm",
Usage: "application to point to (swarm or swarm-private)",
Destination: &appName,
Name: "hosts",
Value: "",
Usage: "comma-separated list of swarm hosts",
Destination: &allhosts,
},
cli.IntFlag{
Name: "cluster-from",
Value: 8501,
Usage: "swarm node (from)",
Destination: &from,
Name: "http-port",
Value: 80,
Usage: "http port",
Destination: &httpPort,
},
cli.IntFlag{
Name: "cluster-to",
Value: 8512,
Usage: "swarm node (to)",
Destination: &to,
},
cli.StringFlag{
Name: "cluster-scheme",
Value: "http",
Usage: "http or https",
Destination: &scheme,
},
cli.BoolFlag{
Name: "include-localhost",
Usage: "whether to include localhost:8500 as an endpoint",
Destination: &includeLocalhost,
Name: "ws-port",
Value: 8546,
Usage: "ws port",
Destination: &wsPort,
},
cli.IntFlag{
Name: "filesize",
Expand Down Expand Up @@ -140,25 +120,25 @@ func main() {
Name: "upload_and_sync",
Aliases: []string{"c"},
Usage: "upload and sync",
Action: wrapCliCommand("upload-and-sync", true, uploadAndSync),
Action: wrapCliCommand("upload-and-sync", uploadAndSyncCmd),
},
{
Name: "feed_sync",
Aliases: []string{"f"},
Usage: "feed update generate, upload and sync",
Action: wrapCliCommand("feed-and-sync", true, feedUploadAndSync),
Action: wrapCliCommand("feed-and-sync", feedUploadAndSyncCmd),
},
{
Name: "upload_speed",
Aliases: []string{"u"},
Usage: "measure upload speed",
Action: wrapCliCommand("upload-speed", true, uploadSpeed),
Action: wrapCliCommand("upload-speed", uploadSpeedCmd),
},
{
Name: "sliding_window",
Aliases: []string{"s"},
Usage: "measure network aggregate capacity",
Action: wrapCliCommand("sliding-window", false, slidingWindow),
Action: wrapCliCommand("sliding-window", slidingWindowCmd),
},
}

Expand Down
Loading

0 comments on commit d212535

Please sign in to comment.