Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat memstore artificial latency #114

Merged
merged 9 commits into from
Sep 6, 2024
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ stop-minio:
docker stop minio && docker rm minio; \
fi

run-server:
./bin/eigenda-proxy
run-memstore-server:
./bin/eigenda-proxy --memstore.enabled

clean:
rm bin/eigenda-proxy
Expand Down
3 changes: 0 additions & 3 deletions cmd/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
)

func StartProxySvr(cliCtx *cli.Context) error {
if err := server.CheckRequired(cliCtx); err != nil {
return err
}
cfg := server.ReadCLIConfig(cliCtx)
if err := cfg.Check(); err != nil {
return err
Expand Down
18 changes: 18 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
// memstore flags
MemstoreFlagName = "memstore.enabled"
MemstoreExpirationFlagName = "memstore.expiration"
MemstorePutLatencyFlagName = "memstore.put-latency"
MemstoreGetLatencyFlagName = "memstore.get-latency"
epociask marked this conversation as resolved.
Show resolved Hide resolved

// S3 client flags
S3CredentialTypeFlagName = "s3.credential-type" // #nosec G101
Expand Down Expand Up @@ -90,6 +92,8 @@ type Config struct {
// Memstore
MemstoreEnabled bool
MemstoreBlobExpiration time.Duration
MemstoreGetLatency time.Duration
MemstorePutLatency time.Duration

// routing
FallbackTargets []string
Expand Down Expand Up @@ -179,6 +183,8 @@ func ReadConfig(ctx *cli.Context) Config {
EthConfirmationDepth: ctx.Int64(EthConfirmationDepthFlagName),
MemstoreEnabled: ctx.Bool(MemstoreFlagName),
MemstoreBlobExpiration: ctx.Duration(MemstoreExpirationFlagName),
MemstoreGetLatency: ctx.Duration(MemstoreGetLatencyFlagName),
MemstorePutLatency: ctx.Duration(MemstorePutLatencyFlagName),
FallbackTargets: ctx.StringSlice(FallbackTargets),
CacheTargets: ctx.StringSlice(CacheTargets),
}
Expand Down Expand Up @@ -419,6 +425,18 @@ func CLIFlags() []cli.Flag {
Value: 25 * time.Minute,
EnvVars: []string{"MEMSTORE_EXPIRATION"},
},
&cli.DurationFlag{
Name: MemstorePutLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.",
Value: 0,
EnvVars: []string{"MEMSTORE_PUT_LATENCY"},
},
&cli.DurationFlag{
Name: MemstoreGetLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.",
Value: 0,
EnvVars: []string{"MEMSTORE_GET_LATENCY"},
},
&cli.StringSliceFlag{
Name: FallbackTargets,
Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.",
Expand Down
19 changes: 4 additions & 15 deletions server/flags.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package server

import (
"fmt"

"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/urfave/cli/v2"

Expand All @@ -26,7 +24,7 @@ var (
ListenAddrFlag = &cli.StringFlag{
Name: ListenAddrFlagName,
Usage: "server listening address",
Value: "127.0.0.1",
Value: "0.0.0.0",
EnvVars: prefixEnvVars("ADDR"),
}
PortFlag = &cli.IntFlag{
Expand All @@ -37,13 +35,13 @@ var (
}
)

var requiredFlags = []cli.Flag{
var requiredFlags = []cli.Flag{}
epociask marked this conversation as resolved.
Show resolved Hide resolved

var optionalFlags = []cli.Flag{
ListenAddrFlag,
PortFlag,
}

var optionalFlags = []cli.Flag{}

func init() {
optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, CLIFlags()...)
Expand Down Expand Up @@ -76,12 +74,3 @@ func (c CLIConfig) Check() error {
}
return nil
}

func CheckRequired(ctx *cli.Context) error {
for _, f := range requiredFlags {
if !ctx.IsSet(f.Names()[0]) {
return fmt.Errorf("flag %s is required", f.Names()[0])
}
}
return nil
}
2 changes: 1 addition & 1 deletion server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.
var eigenda store.KeyGeneratedStore
if cfg.EigenDAConfig.MemstoreEnabled {
log.Info("Using mem-store backend for EigenDA")
eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration)
eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration, cfg.EigenDAConfig.MemstorePutLatency, cfg.EigenDAConfig.MemstoreGetLatency)
epociask marked this conversation as resolved.
Show resolved Hide resolved
} else {
var client *clients.EigenDAClient
log.Info("Using EigenDA backend")
Expand Down
20 changes: 14 additions & 6 deletions store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ EigenDA operators.
type MemStore struct {
sync.RWMutex

l log.Logger
keyStarts map[string]time.Time
store map[string][]byte
verifier *verify.Verifier
codec codecs.BlobCodec
l log.Logger
keyStarts map[string]time.Time
store map[string][]byte
verifier *verify.Verifier
codec codecs.BlobCodec
putLatency time.Duration
getLatency time.Duration

maxBlobSizeBytes uint64
blobExpiration time.Duration
Expand All @@ -45,7 +47,8 @@ var _ KeyGeneratedStore = (*MemStore)(nil)

// NewMemStore ... constructor
func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger,
maxBlobSizeBytes uint64, blobExpiration time.Duration) (*MemStore, error) {
maxBlobSizeBytes uint64, blobExpiration time.Duration,
putLatency, getLatency time.Duration) (*MemStore, error) {
store := &MemStore{
l: l,
keyStarts: make(map[string]time.Time),
Expand All @@ -54,6 +57,9 @@ func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger,
codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
maxBlobSizeBytes: maxBlobSizeBytes,
blobExpiration: blobExpiration,
// artificial latency added for memstore backend to mimic eigenda's latency
putLatency: putLatency,
getLatency: getLatency,
}

if store.blobExpiration != 0 {
Expand Down Expand Up @@ -97,6 +103,7 @@ func (e *MemStore) pruneExpired() {

// Get fetches a value from the store.
func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
time.Sleep(e.getLatency)
e.reads++
e.RLock()
defer e.RUnlock()
Expand Down Expand Up @@ -124,6 +131,7 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {

// Put inserts a value into the store.
func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
time.Sleep(e.putLatency)
if uint64(len(value)) > e.maxBlobSizeBytes {
return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.maxBlobSizeBytes)
}
Expand Down
55 changes: 55 additions & 0 deletions store/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestGetSet(t *testing.T) {
log.New(),
1024*1024*2,
time.Hour*1000,
0, 0,
)

require.NoError(t, err)
Expand Down Expand Up @@ -85,6 +86,7 @@ func TestExpiration(t *testing.T) {
log.New(),
1024*1024*2,
time.Millisecond*10,
0, 0,
)

require.NoError(t, err)
Expand All @@ -100,3 +102,56 @@ func TestExpiration(t *testing.T) {
require.Error(t, err)

}

func TestLatency(t *testing.T) {
t.Parallel()

putLatency := 1 * time.Second
getLatency := 1 * time.Second

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kzgConfig := &kzg.KzgConfig{
G1Path: "../resources/g1.point",
G2PowerOf2Path: "../resources/g2.point.powerOf2",
CacheDir: "../resources/SRSTables",
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
}
epociask marked this conversation as resolved.
Show resolved Hide resolved

cfg := &verify.Config{
Verify: false,
KzgConfig: kzgConfig,
}

verifier, err := verify.NewVerifier(cfg, nil)
require.NoError(t, err)

ms, err := NewMemStore(
ctx,
verifier,
log.New(),
1024*1024*2,
time.Millisecond*10,
putLatency, getLatency,
)

require.NoError(t, err)

preimage := []byte(testPreimage)
timeBeforePut := time.Now()
key, err := ms.Put(ctx, preimage)
require.NoError(t, err)
require.GreaterOrEqual(t, time.Since(timeBeforePut), putLatency)

// sleep 1 second and verify that older blob entries are removed
time.Sleep(time.Second * 1)
epociask marked this conversation as resolved.
Show resolved Hide resolved

timeBeforeGet := time.Now()
_, err = ms.Get(ctx, key)
require.Error(t, err)
require.GreaterOrEqual(t, time.Since(timeBeforeGet), getLatency)

}
Loading