Skip to content

Commit

Permalink
feat(edssser): introduce EDS Store Stresser and cel-shed utility (cel…
Browse files Browse the repository at this point in the history
…estiaorg#2482)

(cherry picked from commit 628559b)
  • Loading branch information
Wondertan authored and walldiss committed Sep 22, 2023
1 parent c6ee275 commit c3d4219
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 4 deletions.
165 changes: 165 additions & 0 deletions cmd/cel-shed/eds_store_stress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"context"
"errors"
_ "expvar"
"fmt"
"math"
"net/http"
"os"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"github.com/pyroscope-io/client/pyroscope"
"github.com/spf13/cobra"

"github.com/celestiaorg/celestia-node/libs/edssser"
"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

const (
edsStorePathFlag = "path"
edsWritesFlag = "writes"
edsSizeFlag = "size"
edsDisableLogFlag = "disable-log"
edsLogStatFreqFlag = "log-stat-freq"
edsCleanupFlag = "cleanup"
edsFreshStartFlag = "fresh"

pyroscopeEndpointFlag = "pyroscope"
putTimeoutFlag = "timeout"
badgerLogLevelFlag = "badger-log-level"
)

func init() {
edsStoreCmd.AddCommand(edsStoreStress)

defaultPath := "~/.edssser"
path, err := homedir.Expand(defaultPath)
if err != nil {
panic(err)
}

pathFlagUsage := fmt.Sprintf("Directory path to use for stress test. Uses %s by default.", defaultPath)
edsStoreStress.Flags().String(edsStorePathFlag, path, pathFlagUsage)
edsStoreStress.Flags().String(pyroscopeEndpointFlag, "",
"Pyroscope address. If no address provided, pyroscope will be disabled")
edsStoreStress.Flags().Int(edsWritesFlag, math.MaxInt, "Total EDS writes to make. MaxInt by default.")
edsStoreStress.Flags().Int(edsSizeFlag, 128, "Chooses EDS size. 128 by default.")
edsStoreStress.Flags().Bool(edsDisableLogFlag, false, "Disables logging. Enabled by default.")
edsStoreStress.Flags().Int(edsLogStatFreqFlag, 10, "Write statistic logging frequency. 10 by default.")
edsStoreStress.Flags().Bool(edsCleanupFlag, false, "Cleans up the store on stop. Disabled by default.")
edsStoreStress.Flags().Bool(edsFreshStartFlag, false, "Cleanup previous state on start. Disabled by default.")
edsStoreStress.Flags().Int(putTimeoutFlag, 30, "Sets put timeout in seconds. 30 sec by default.")
edsStoreStress.Flags().String(badgerLogLevelFlag, "INFO", "Badger log level, Defaults to INFO")

// kill redundant print
nodebuilder.PrintKeyringInfo = false
}

var edsStoreCmd = &cobra.Command{
Use: "eds-store [subcommand]",
Short: "Collection of eds-store related utilities",
}

var edsStoreStress = &cobra.Command{
Use: "stress",
Short: `Runs eds.Store stress test over default node.Store Datastore backend (e.g. Badger).`,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) (err error) {
// expose expvar vars over http
go http.ListenAndServe(":9999", http.DefaultServeMux) //nolint:errcheck,gosec

endpoint, _ := cmd.Flags().GetString(pyroscopeEndpointFlag)
if endpoint != "" {
_, err = pyroscope.Start(pyroscope.Config{
ApplicationName: "cel-shred.stresser",
ServerAddress: endpoint,
ProfileTypes: []pyroscope.ProfileType{
pyroscope.ProfileCPU,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileAllocSpace,
pyroscope.ProfileInuseObjects,
pyroscope.ProfileInuseSpace,
},
})
if err != nil {
fmt.Printf("failed to launch pyroscope with addr: %s err: %s\n", endpoint, err.Error())
} else {
fmt.Println("connected pyroscope to:", endpoint)
}
}

path, _ := cmd.Flags().GetString(edsStorePathFlag)
fmt.Printf("using %s\n", path)

freshStart, _ := cmd.Flags().GetBool(edsFreshStartFlag)
if freshStart {
err = os.RemoveAll(path)
if err != nil {
return err
}
}

cleanup, _ := cmd.Flags().GetBool(edsCleanupFlag)
if cleanup {
defer func() {
err = errors.Join(err, os.RemoveAll(path))
}()
}

loglevel, _ := cmd.Flags().GetString(badgerLogLevelFlag)
if err = logging.SetLogLevel("badger", loglevel); err != nil {
return err
}

disableLog, _ := cmd.Flags().GetBool(edsDisableLogFlag)
logFreq, _ := cmd.Flags().GetInt(edsLogStatFreqFlag)
edsWrites, _ := cmd.Flags().GetInt(edsWritesFlag)
edsSize, _ := cmd.Flags().GetInt(edsSizeFlag)
putTimeout, _ := cmd.Flags().GetInt(putTimeoutFlag)

cfg := edssser.Config{
EDSSize: edsSize,
EDSWrites: edsWrites,
EnableLog: !disableLog,
LogFilePath: path,
StatLogFreq: logFreq,
OpTimeout: time.Duration(putTimeout) * time.Second,
}

err = nodebuilder.Init(*nodebuilder.DefaultConfig(node.Full), path, node.Full)
if err != nil {
return err
}

nodestore, err := nodebuilder.OpenStore(path, nil)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, nodestore.Close())
}()

datastore, err := nodestore.Datastore()
if err != nil {
return err
}

stresser, err := edssser.NewEDSsser(path, datastore, cfg)
if err != nil {
return err
}

stats, err := stresser.Run(cmd.Context())
if !errors.Is(err, context.Canceled) {
return err
}

fmt.Printf("%s", stats.Finalize())
return nil
},
}
9 changes: 7 additions & 2 deletions cmd/cel-shed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package main
import (
"context"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"
)

func init() {
rootCmd.AddCommand(p2pCmd, headerCmd)
rootCmd.AddCommand(p2pCmd, headerCmd, edsStoreCmd)
}

var rootCmd = &cobra.Command{
Expand All @@ -26,5 +28,8 @@ func main() {
}

func run() error {
return rootCmd.ExecuteContext(context.Background())
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

return rootCmd.ExecuteContext(ctx)
}
172 changes: 172 additions & 0 deletions libs/edssser/edssser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package edssser

import (
"context"
"errors"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-app/pkg/da"

"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)

type Config struct {
EDSSize int
EDSWrites int
EnableLog bool
LogFilePath string
StatLogFreq int
OpTimeout time.Duration
}

// EDSsser stand for EDS Store Stresser.
type EDSsser struct {
config Config
datastore datastore.Batching
edsstoreMu sync.Mutex
edsstore *eds.Store

statsFileMu sync.Mutex
statsFile *os.File
}

func NewEDSsser(path string, datastore datastore.Batching, cfg Config) (*EDSsser, error) {
edsstore, err := eds.NewStore(path, datastore)
if err != nil {
return nil, err
}

return &EDSsser{
config: cfg,
datastore: datastore,
edsstore: edsstore,
}, nil
}

func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) {
ss.edsstoreMu.Lock()
defer ss.edsstoreMu.Unlock()

err = ss.edsstore.Start(ctx)
if err != nil {
return stats, err
}
defer func() {
err = errors.Join(err, ss.edsstore.Stop(ctx))
}()

edsHashes, err := ss.edsstore.List()
if err != nil {
return stats, err
}
fmt.Printf("recovered %d EDSes\n\n", len(edsHashes))

t := &testing.T{}
for toWrite := ss.config.EDSWrites - len(edsHashes); ctx.Err() == nil && toWrite > 0; toWrite-- {
took, err := ss.put(ctx, t)

stats.TotalWritten++
stats.TotalTime += took
if took < stats.MinTime || stats.MinTime == 0 {
stats.MinTime = took
} else if took > stats.MaxTime {
stats.MaxTime = took
}

if ss.config.EnableLog {
if stats.TotalWritten%ss.config.StatLogFreq == 0 {
stats := stats.Finalize()
fmt.Println(stats)
go func() {
err := ss.dumpStat(stats)
if err != nil {
fmt.Printf("error dumping stats: %s\n", err.Error())
}
}()
}
if err != nil {
fmt.Printf("ERROR put: %s, took: %v, at: %v\n", err.Error(), took, time.Now())
continue
}
if took > ss.config.OpTimeout/2 {
fmt.Println("long put", "size", ss.config.EDSSize, "took", took, "at", time.Now())
continue
}

fmt.Println("square written", "size", ss.config.EDSSize, "took", took, "at", time.Now())
}
}
return stats, nil
}

func (ss *EDSsser) dumpStat(stats Stats) (err error) {
ss.statsFileMu.Lock()
defer ss.statsFileMu.Unlock()

ss.statsFile, err = os.Create(ss.config.LogFilePath + "/edssser_stats.txt")
if err != nil {
return err
}

_, err = ss.statsFile.Write([]byte(stats.String()))
if err != nil {
return err
}

return ss.statsFile.Close()
}

type Stats struct {
TotalWritten int
TotalTime, MinTime, MaxTime, AvgTime time.Duration
// Deviation ?
}

func (stats Stats) Finalize() Stats {
if stats.TotalTime != 0 {
stats.AvgTime = stats.TotalTime / time.Duration(stats.TotalWritten)
}
return stats
}

func (stats Stats) String() string {
return fmt.Sprintf(`
TotalWritten %d
TotalWritingTime %v
MaxTime %s
MinTime %s
AvgTime %s
`,
stats.TotalWritten,
stats.TotalTime,
stats.MaxTime,
stats.MinTime,
stats.AvgTime,
)
}

func (ss *EDSsser) put(ctx context.Context, t *testing.T) (time.Duration, error) {
ctx, cancel := context.WithTimeout(ctx, ss.config.OpTimeout)
if ss.config.OpTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()

// divide by 2 to get ODS size as expected by RandEDS
square := edstest.RandEDS(t, ss.config.EDSSize/2)
dah, err := da.NewDataAvailabilityHeader(square)
if err != nil {
return 0, err
}

now := time.Now()
err = ss.edsstore.Put(ctx, dah.Hash(), square)
return time.Since(now), err
}
9 changes: 7 additions & 2 deletions nodebuilder/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/state"
)

// PrintKeyringInfo whether to print keyring information during init.
var PrintKeyringInfo = true

// Init initializes the Node FileSystem Store for the given Node Type 'tp' in the directory under
// 'path'.
func Init(cfg Config, path string, tp node.Type) error {
Expand Down Expand Up @@ -213,8 +216,10 @@ func generateKeys(cfg Config, ksPath string) error {
if err != nil {
return err
}
fmt.Printf("\nNAME: %s\nADDRESS: %s\nMNEMONIC (save this somewhere safe!!!): \n%s\n\n",
keyInfo.Name, addr.String(), mn)
if PrintKeyringInfo {
fmt.Printf("\nNAME: %s\nADDRESS: %s\nMNEMONIC (save this somewhere safe!!!): \n%s\n\n",
keyInfo.Name, addr.String(), mn)
}
return nil
}

Expand Down

0 comments on commit c3d4219

Please sign in to comment.