Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat memstore artificial latency #114

Merged
merged 9 commits into from
Sep 6, 2024
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ stop-minio:
docker stop minio && docker rm minio; \
fi

run-server:
./bin/eigenda-proxy
run-memstore-server:
./bin/eigenda-proxy --memstore.enabled

clean:
rm bin/eigenda-proxy
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ In order to disperse to the EigenDA network in production, or at high throughput
| `--log.pid` | `false` | `$EIGENDA_PROXY_LOG_PID` | Show pid in the log. |
| `--memstore.enabled` | `false` | `$MEMSTORE_ENABLED` | Whether to use mem-store for DA logic. |
| `--memstore.expiration` | `25m0s` | `$MEMSTORE_EXPIRATION` | Duration that a mem-store blob/commitment pair are allowed to live. |
| `--memstore.put-latency` | `0` | `$MEMSTORE_PUT_LATENCY` | Artificial latency added for memstore backend to mimic EigenDA's dispersal latency. |
| `--memstore.get-latency` | `0` | `$MEMSTORE_GET_LATENCY` | Artificial latency added for memstore backend to mimic EigenDA's retrieval latency. |
| `--metrics.addr` | `"0.0.0.0"` | `$EIGENDA_PROXY_METRICS_ADDR` | Metrics listening address. |
| `--metrics.enabled` | `false` | `$EIGENDA_PROXY_METRICS_ENABLED` | Enable the metrics server. |
| `--metrics.port` | `7300` | `$EIGENDA_PROXY_METRICS_PORT` | Metrics listening port. |
Expand Down
3 changes: 0 additions & 3 deletions cmd/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
)

func StartProxySvr(cliCtx *cli.Context) error {
if err := server.CheckRequired(cliCtx); err != nil {
return err
}
cfg := server.ReadCLIConfig(cliCtx)
if err := cfg.Check(); err != nil {
return err
Expand Down
18 changes: 18 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
// memstore flags
MemstoreFlagName = "memstore.enabled"
MemstoreExpirationFlagName = "memstore.expiration"
MemstorePutLatencyFlagName = "memstore.put-latency"
MemstoreGetLatencyFlagName = "memstore.get-latency"
epociask marked this conversation as resolved.
Show resolved Hide resolved

// S3 client flags
S3CredentialTypeFlagName = "s3.credential-type" // #nosec G101
Expand Down Expand Up @@ -90,6 +92,8 @@ type Config struct {
// Memstore
MemstoreEnabled bool
MemstoreBlobExpiration time.Duration
MemstoreGetLatency time.Duration
MemstorePutLatency time.Duration

// routing
FallbackTargets []string
Expand Down Expand Up @@ -179,6 +183,8 @@ func ReadConfig(ctx *cli.Context) Config {
EthConfirmationDepth: ctx.Int64(EthConfirmationDepthFlagName),
MemstoreEnabled: ctx.Bool(MemstoreFlagName),
MemstoreBlobExpiration: ctx.Duration(MemstoreExpirationFlagName),
MemstoreGetLatency: ctx.Duration(MemstoreGetLatencyFlagName),
MemstorePutLatency: ctx.Duration(MemstorePutLatencyFlagName),
FallbackTargets: ctx.StringSlice(FallbackTargets),
CacheTargets: ctx.StringSlice(CacheTargets),
}
Expand Down Expand Up @@ -419,6 +425,18 @@ func CLIFlags() []cli.Flag {
Value: 25 * time.Minute,
EnvVars: []string{"MEMSTORE_EXPIRATION"},
},
&cli.DurationFlag{
Name: MemstorePutLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.",
Value: 0,
EnvVars: []string{"MEMSTORE_PUT_LATENCY"},
},
&cli.DurationFlag{
Name: MemstoreGetLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.",
Value: 0,
EnvVars: []string{"MEMSTORE_GET_LATENCY"},
},
&cli.StringSliceFlag{
Name: FallbackTargets,
Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.",
Expand Down
41 changes: 10 additions & 31 deletions server/flags.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package server

import (
"fmt"

"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/urfave/cli/v2"

Expand All @@ -22,38 +20,28 @@ func prefixEnvVars(name string) []string {
return opservice.PrefixEnvVar(EnvVarPrefix, name)
}

var (
ListenAddrFlag = &cli.StringFlag{
// Flags contains the list of configuration options available to the binary.
var Flags = []cli.Flag{
&cli.StringFlag{
Name: ListenAddrFlagName,
Usage: "server listening address",
Value: "127.0.0.1",
Value: "0.0.0.0",
EnvVars: prefixEnvVars("ADDR"),
}
PortFlag = &cli.IntFlag{
},
&cli.IntFlag{
Name: PortFlagName,
Usage: "server listening port",
Value: 3100,
EnvVars: prefixEnvVars("PORT"),
}
)

var requiredFlags = []cli.Flag{
ListenAddrFlag,
PortFlag,
},
}

var optionalFlags = []cli.Flag{}

func init() {
optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, CLIFlags()...)
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...) //nolint:gocritic // this is a global variable
Flags = append(Flags, oplog.CLIFlags(EnvVarPrefix)...)
Flags = append(Flags, CLIFlags()...)
Flags = append(Flags, opmetrics.CLIFlags(EnvVarPrefix)...)
}

// Flags contains the list of configuration options available to the binary.
var Flags []cli.Flag

type CLIConfig struct {
S3Config store.S3Config
EigenDAConfig Config
Expand All @@ -76,12 +64,3 @@ func (c CLIConfig) Check() error {
}
return nil
}

func CheckRequired(ctx *cli.Context) error {
for _, f := range requiredFlags {
if !ctx.IsSet(f.Names()[0]) {
return fmt.Errorf("flag %s is required", f.Names()[0])
}
}
return nil
}
7 changes: 6 additions & 1 deletion server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.
var eigenda store.KeyGeneratedStore
if cfg.EigenDAConfig.MemstoreEnabled {
log.Info("Using mem-store backend for EigenDA")
eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration)
eigenda, err = store.NewMemStore(ctx, verifier, log, store.MemStoreConfig{
MaxBlobSizeBytes: maxBlobLength,
BlobExpiration: cfg.EigenDAConfig.MemstoreBlobExpiration,
PutLatency: cfg.EigenDAConfig.MemstorePutLatency,
GetLatency: cfg.EigenDAConfig.MemstoreGetLatency,
})
} else {
var client *clients.EigenDAClient
log.Info("Using EigenDA backend")
Expand Down
43 changes: 26 additions & 17 deletions store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ const (
DefaultPruneInterval = 500 * time.Millisecond
)

type MemStoreConfig struct {
MaxBlobSizeBytes uint64
BlobExpiration time.Duration
// artificial latency added for memstore backend to mimic eigenda's latency
PutLatency time.Duration
GetLatency time.Duration
}

/*
MemStore is a simple in-memory store for blobs which uses an expiration
time to evict blobs to best emulate the ephemeral nature of blobs dispersed to
Expand All @@ -30,34 +38,33 @@ EigenDA operators.
type MemStore struct {
sync.RWMutex

config MemStoreConfig
l log.Logger
keyStarts map[string]time.Time
store map[string][]byte
verifier *verify.Verifier
codec codecs.BlobCodec

maxBlobSizeBytes uint64
blobExpiration time.Duration
reads int
reads int
}

var _ KeyGeneratedStore = (*MemStore)(nil)

// NewMemStore ... constructor
func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger,
maxBlobSizeBytes uint64, blobExpiration time.Duration) (*MemStore, error) {
func NewMemStore(
ctx context.Context, verifier *verify.Verifier, l log.Logger, config MemStoreConfig,
) (*MemStore, error) {
store := &MemStore{
l: l,
keyStarts: make(map[string]time.Time),
store: make(map[string][]byte),
verifier: verifier,
codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
maxBlobSizeBytes: maxBlobSizeBytes,
blobExpiration: blobExpiration,
l: l,
config: config,
keyStarts: make(map[string]time.Time),
store: make(map[string][]byte),
verifier: verifier,
codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
}

if store.blobExpiration != 0 {
l.Info("memstore expiration enabled", "time", store.blobExpiration)
if store.config.BlobExpiration != 0 {
l.Info("memstore expiration enabled", "time", store.config.BlobExpiration)
go store.EventLoop(ctx)
}

Expand Down Expand Up @@ -86,7 +93,7 @@ func (e *MemStore) pruneExpired() {
defer e.Unlock()

for commit, dur := range e.keyStarts {
if time.Since(dur) >= e.blobExpiration {
if time.Since(dur) >= e.config.BlobExpiration {
delete(e.keyStarts, commit)
delete(e.store, commit)

Expand All @@ -97,6 +104,7 @@ func (e *MemStore) pruneExpired() {

// Get fetches a value from the store.
func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
time.Sleep(e.config.GetLatency)
e.reads++
e.RLock()
defer e.RUnlock()
Expand Down Expand Up @@ -124,8 +132,9 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {

// Put inserts a value into the store.
func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
if uint64(len(value)) > e.maxBlobSizeBytes {
return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.maxBlobSizeBytes)
time.Sleep(e.config.PutLatency)
if uint64(len(value)) > e.config.MaxBlobSizeBytes {
return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.config.MaxBlobSizeBytes)
}

e.Lock()
Expand Down
95 changes: 61 additions & 34 deletions store/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,41 @@ const (
testPreimage = "Four score and seven years ago"
)

func TestGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kzgConfig := &kzg.KzgConfig{
G1Path: "../resources/g1.point",
G2PowerOf2Path: "../resources/g2.point.powerOf2",
CacheDir: "../resources/SRSTables",
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
func getDefaultMemStoreTestConfig() MemStoreConfig {
return MemStoreConfig{
MaxBlobSizeBytes: 1024 * 1024,
BlobExpiration: 0,
PutLatency: 0,
GetLatency: 0,
}
}

cfg := &verify.Config{
Verify: false,
KzgConfig: kzgConfig,
func getDefaultVerifierTestConfig() *verify.Config {
return &verify.Config{
Verify: false,
KzgConfig: &kzg.KzgConfig{
G1Path: "../resources/g1.point",
G2PowerOf2Path: "../resources/g2.point.powerOf2",
CacheDir: "../resources/SRSTables",
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
},
}
}

verifier, err := verify.NewVerifier(cfg, nil)
func TestGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil)
require.NoError(t, err)

ms, err := NewMemStore(
ctx,
verifier,
log.New(),
1024*1024*2,
time.Hour*1000,
getDefaultMemStoreTestConfig(),
)

require.NoError(t, err)
Expand All @@ -62,29 +70,16 @@ func TestExpiration(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kzgConfig := &kzg.KzgConfig{
G1Path: "../resources/g1.point",
G2PowerOf2Path: "../resources/g2.point.powerOf2",
CacheDir: "../resources/SRSTables",
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
}

cfg := &verify.Config{
Verify: false,
KzgConfig: kzgConfig,
}

verifier, err := verify.NewVerifier(cfg, nil)
verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil)
require.NoError(t, err)

memstoreConfig := getDefaultMemStoreTestConfig()
memstoreConfig.BlobExpiration = 10 * time.Millisecond
ms, err := NewMemStore(
ctx,
verifier,
log.New(),
1024*1024*2,
time.Millisecond*10,
memstoreConfig,
)

require.NoError(t, err)
Expand All @@ -100,3 +95,35 @@ func TestExpiration(t *testing.T) {
require.Error(t, err)

}

func TestLatency(t *testing.T) {
t.Parallel()

putLatency := 1 * time.Second
getLatency := 1 * time.Second

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil)
require.NoError(t, err)

config := getDefaultMemStoreTestConfig()
config.PutLatency = putLatency
config.GetLatency = getLatency
ms, err := NewMemStore(ctx, verifier, log.New(), config)

require.NoError(t, err)

preimage := []byte(testPreimage)
timeBeforePut := time.Now()
key, err := ms.Put(ctx, preimage)
require.NoError(t, err)
require.GreaterOrEqual(t, time.Since(timeBeforePut), putLatency)

timeBeforeGet := time.Now()
_, err = ms.Get(ctx, key)
require.NoError(t, err)
require.GreaterOrEqual(t, time.Since(timeBeforeGet), getLatency)

}
Loading