From ea2ef512c5b2b80d01a8000d0501d29d30c6420a Mon Sep 17 00:00:00 2001 From: Ethen Date: Sun, 8 Sep 2024 18:59:10 -0400 Subject: [PATCH] feat: Redis backend target (#111) * feat: Redis backend target * feat: Redis backend target - update holesky test make target * feat: Redis backend target - address PR feedback * feat: Redis backend target - rebase to new main --- Makefile | 18 ++++++-- README.md | 4 ++ e2e/server_test.go | 45 +++++++++++++++++++ e2e/setup.go | 19 ++++++++ go.mod | 2 + go.sum | 4 ++ mocks/router.go | 28 ++++++++++++ server/config.go | 64 +++++++++++++++++++++++---- server/config_test.go | 14 ++++++ server/flags.go | 2 + server/load_store.go | 87 +++++++++++++++++------------------- server/server.go | 18 ++++++++ store/redis.go | 100 ++++++++++++++++++++++++++++++++++++++++++ store/router.go | 19 ++++++++ store/store.go | 19 +++++--- 15 files changed, 378 insertions(+), 65 deletions(-) create mode 100644 store/redis.go diff --git a/Makefile b/Makefile index c2771c8..624fe9f 100644 --- a/Makefile +++ b/Makefile @@ -26,11 +26,19 @@ docker-build: run-minio: docker run -p 4566:9000 -d -e "MINIO_ROOT_USER=minioadmin" -e "MINIO_ROOT_PASSWORD=minioadmin" --name minio minio/minio server /data +run-redis: + docker run -p 9001:6379 -d --name redis redis + stop-minio: @if [ -n "$$(docker ps -q -f name=minio)" ]; then \ docker stop minio && docker rm minio; \ fi +stop-redis: + @if [ -n "$$(docker ps -q -f name=redis)" ]; then \ + docker stop redis && docker rm redis; \ + fi + run-memstore-server: ./bin/eigenda-proxy --memstore.enabled @@ -40,13 +48,15 @@ clean: test: go test -v ./... -parallel 4 -e2e-test: stop-minio run-minio +e2e-test: stop-minio stop-redis run-minio run-redis $(E2ETEST) && \ - make stop-minio + make stop-minio && \ + make stop-redis -holesky-test: run-minio +holesky-test: stop-minio stop-redis run-minio run-redis $(HOLESKYTEST) && \ - make stop-minio + make stop-minio && \ + make stop-redis .PHONY: lint lint: diff --git a/README.md b/README.md index 846cfc1..9c0f0ce 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,10 @@ In order to disperse to the EigenDA network in production, or at high throughput | `--routing.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. | | `--routing.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. | | `--s3.timeout` | `5s` | `$EIGENDA_PROXY_S3_TIMEOUT` | timeout for S3 storage operations (e.g. get, put) | +| `--redis.db` | `0` | `$EIGENDA_PROXY_REDIS_DB` | redis database to use after connecting to server | +| `--redis.endpoint` | `""` | `$EIGENDA_PROXY_REDIS_ENDPOINT` | redis endpoint url | +| `--redis.password` | `""` | `$EIGENDA_PROXY_REDIS_PASSWORD` | redis password | +| `--redis.eviction` | `24h0m0s` | `$EIGENDA_PROXY_REDIS_EVICTION` | entry eviction/expiration time | | `--help, -h` | `false` | | Show help. | | `--version, -v` | `false` | | Print the version. | diff --git a/e2e/server_test.go b/e2e/server_test.go index 5c213ce..1514b40 100644 --- a/e2e/server_test.go +++ b/e2e/server_test.go @@ -7,6 +7,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/client" "github.com/Layr-Labs/eigenda-proxy/e2e" + "github.com/Layr-Labs/eigenda-proxy/store" op_plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/stretchr/testify/require" ) @@ -203,6 +204,50 @@ func TestProxyServerCaching(t *testing.T) { } } +func TestProxyServerCachingWithRedis(t *testing.T) { + if !runIntegrationTests && !runTestnetIntegrationTests { + t.Skip("Skipping test as INTEGRATION or TESTNET env var not set") + } + + t.Parallel() + + testCfg := e2e.TestConfig(useMemory()) + testCfg.UseRedisCaching = true + + ts, kill := e2e.CreateTestSuite(t, testCfg) + defer kill() + + cfg := &client.Config{ + URL: ts.Address(), + } + daClient := client.New(cfg) + // 10 kb blob + testPreimage := []byte(e2e.RandString(10_000)) + + t.Log("Setting input data on proxy server...") + blobInfo, err := daClient.SetData(ts.Ctx, testPreimage) + require.NotEmpty(t, blobInfo) + require.NoError(t, err) + + t.Log("Getting input data from proxy server...") + preimage, err := daClient.GetData(ts.Ctx, blobInfo) + require.NoError(t, err) + require.Equal(t, testPreimage, preimage) + + // ensure that read was from cache + redStats, err := ts.Server.GetStoreStats(store.Redis) + require.NoError(t, err) + + require.Equal(t, 1, redStats.Reads) + require.Equal(t, 1, redStats.Entries) + + if useMemory() { // ensure that eigenda was not read from + memStats := ts.Server.GetEigenDAStats() + require.Equal(t, 0, memStats.Reads) + require.Equal(t, 1, memStats.Entries) + } +} + /* Ensure that fallback location is read from when EigenDA blob is not available. This is done by setting the memstore expiration time to 1ms and waiting for the blob to expire diff --git a/e2e/setup.go b/e2e/setup.go index f473b27..3eea3f0 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -36,6 +36,7 @@ type Cfg struct { Expiration time.Duration UseKeccak256ModeS3 bool UseS3Caching bool + UseRedisCaching bool UseS3Fallback bool } @@ -45,10 +46,24 @@ func TestConfig(useMemory bool) *Cfg { Expiration: 14 * 24 * time.Hour, UseKeccak256ModeS3: false, UseS3Caching: false, + UseRedisCaching: false, UseS3Fallback: false, } } +func createRedisConfig(eigendaCfg server.Config) server.CLIConfig { + return server.CLIConfig{ + EigenDAConfig: eigendaCfg, + RedisCfg: store.RedisConfig{ + Endpoint: "127.0.0.1:9001", + Password: "", + DB: 0, + Eviction: 10 * time.Minute, + Profile: true, + }, + } +} + func createS3Config(eigendaCfg server.Config) server.CLIConfig { // generate random string bucketName := "eigenda-proxy-test-" + RandString(10) @@ -141,6 +156,10 @@ func CreateTestSuite(t *testing.T, testCfg *Cfg) (TestSuite, func()) { eigendaCfg.FallbackTargets = []string{"S3"} cfg = createS3Config(eigendaCfg) + case testCfg.UseRedisCaching: + eigendaCfg.CacheTargets = []string{"redis"} + cfg = createRedisConfig(eigendaCfg) + default: cfg = server.CLIConfig{ EigenDAConfig: eigendaCfg, diff --git a/go.mod b/go.mod index 01e2d91..7e110d4 100644 --- a/go.mod +++ b/go.mod @@ -66,6 +66,7 @@ require ( github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect github.com/deepmap/oapi-codegen v1.8.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dlclark/regexp2 v1.7.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 // indirect @@ -88,6 +89,7 @@ require ( github.com/getsentry/sentry-go v0.20.0 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect + github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/goccy/go-json v0.10.3 // indirect diff --git a/go.sum b/go.sum index e6035a4..9884de3 100644 --- a/go.sum +++ b/go.sum @@ -226,6 +226,8 @@ github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrV github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= @@ -315,6 +317,8 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= diff --git a/mocks/router.go b/mocks/router.go index ab69f12..97c171f 100644 --- a/mocks/router.go +++ b/mocks/router.go @@ -36,6 +36,34 @@ func (m *MockIRouter) EXPECT() *MockIRouterMockRecorder { return m.recorder } +// Caches mocks base method. +func (m *MockIRouter) Caches() []store.PrecomputedKeyStore { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Caches") + ret0, _ := ret[0].([]store.PrecomputedKeyStore) + return ret0 +} + +// Caches indicates an expected call of Caches. +func (mr *MockIRouterMockRecorder) Caches() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Caches", reflect.TypeOf((*MockIRouter)(nil).Caches)) +} + +// Fallbacks mocks base method. +func (m *MockIRouter) Fallbacks() []store.PrecomputedKeyStore { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Fallbacks") + ret0, _ := ret[0].([]store.PrecomputedKeyStore) + return ret0 +} + +// Fallbacks indicates an expected call of Fallbacks. +func (mr *MockIRouterMockRecorder) Fallbacks() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fallbacks", reflect.TypeOf((*MockIRouter)(nil).Fallbacks)) +} + // Get mocks base method. func (m *MockIRouter) Get(arg0 context.Context, arg1 []byte, arg2 commitments.CommitmentMode) ([]byte, error) { m.ctrl.T.Helper() diff --git a/server/config.go b/server/config.go index f407301..bd4279a 100644 --- a/server/config.go +++ b/server/config.go @@ -41,6 +41,12 @@ const ( MemstorePutLatencyFlagName = "memstore.put-latency" MemstoreGetLatencyFlagName = "memstore.get-latency" + // redis client flags + RedisEndpointFlagName = "redis.endpoint" + RedisPasswordFlagName = "redis.password" + RedisDBFlagName = "redis.db" + RedisEvictionFlagName = "redis.eviction" + // S3 client flags S3CredentialTypeFlagName = "s3.credential-type" // #nosec G101 S3BucketFlagName = "s3.bucket" // #nosec G101 @@ -67,29 +73,28 @@ var ( ) type Config struct { - S3Config store.S3Config - + // eigenda ClientConfig clients.EigenDAClientConfig - // The blob encoding version to use when writing blobs from the high level interface. + // the blob encoding version to use when writing blobs from the high level interface. PutBlobEncodingVersion codecs.BlobEncodingVersion - // ETH vars + // eth vars EthRPC string SvcManagerAddr string EthConfirmationDepth int64 - // KZG vars + // kzg vars CacheDir string G1Path string G2Path string G2PowerOfTauPath string - // Size constraints + // size constraints MaxBlobLength string maxBlobLengthBytes uint64 - // Memstore + // memstore MemstoreEnabled bool MemstoreBlobExpiration time.Duration MemstoreGetLatency time.Duration @@ -98,6 +103,10 @@ type Config struct { // routing FallbackTargets []string CacheTargets []string + + // secondary storage + RedisCfg store.RedisConfig + S3Config store.S3Config } // GetMaxBlobLength ... returns the maximum blob length in bytes @@ -153,6 +162,12 @@ func (cfg *Config) VerificationCfg() *verify.Config { // ReadConfig ... parses the Config from the provided flags or environment variables. func ReadConfig(ctx *cli.Context) Config { cfg := Config{ + RedisCfg: store.RedisConfig{ + Endpoint: ctx.String(RedisEndpointFlagName), + Password: ctx.String(RedisPasswordFlagName), + DB: ctx.Int(RedisDBFlagName), + Eviction: ctx.Duration(RedisEvictionFlagName), + }, S3Config: store.S3Config{ S3CredentialType: store.StringToS3CredentialType(ctx.String(S3CredentialTypeFlagName)), Bucket: ctx.String(S3BucketFlagName), @@ -244,6 +259,10 @@ func (cfg *Config) Check() error { } } + if cfg.RedisCfg.Endpoint == "" && cfg.RedisCfg.Password != "" { + return fmt.Errorf("redis password is set, but endpoint is not") + } + if !cfg.MemstoreEnabled && cfg.ClientConfig.RPC == "" { return fmt.Errorf("eigenda disperser rpc url is not set") } @@ -268,7 +287,7 @@ func (cfg *Config) Check() error { return nil } -// flags used for S3 backend configuration +// s3Flags ... used for S3 backend configuration func s3Flags() []cli.Flag { return []cli.Flag{ &cli.StringFlag{ @@ -319,6 +338,34 @@ func s3Flags() []cli.Flag { } } +// redisFlags ... used for Redis backend configuration +func redisFlags() []cli.Flag { + return []cli.Flag{ + &cli.StringFlag{ + Name: RedisEndpointFlagName, + Usage: "Redis endpoint", + EnvVars: prefixEnvVars("REDIS_ENDPOINT"), + }, + &cli.StringFlag{ + Name: RedisPasswordFlagName, + Usage: "Redis password", + EnvVars: prefixEnvVars("REDIS_PASSWORD"), + }, + &cli.IntFlag{ + Name: RedisDBFlagName, + Usage: "Redis database", + Value: 0, + EnvVars: prefixEnvVars("REDIS_DB"), + }, + &cli.DurationFlag{ + Name: RedisEvictionFlagName, + Usage: "Redis eviction time", + Value: 24 * time.Hour, + EnvVars: prefixEnvVars("REDIS_EVICTION"), + }, + } +} + func CLIFlags() []cli.Flag { // TODO: Decompose all flags into constituent parts based on their respective category / usage flags := []cli.Flag{ @@ -452,5 +499,6 @@ func CLIFlags() []cli.Flag { } flags = append(flags, s3Flags()...) + flags = append(flags, redisFlags()...) return flags } diff --git a/server/config_test.go b/server/config_test.go index f045fd3..eba3e50 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -11,6 +11,12 @@ import ( func validCfg() *Config { return &Config{ + RedisCfg: store.RedisConfig{ + Endpoint: "localhost:6379", + Password: "password", + DB: 0, + Eviction: 10 * time.Minute, + }, S3Config: store.S3Config{ Bucket: "test-bucket", Path: "", @@ -174,4 +180,12 @@ func TestConfigVerification(t *testing.T) { err := cfg.Check() require.Error(t, err) }) + + t.Run("BadRedisConfiguration", func(t *testing.T) { + cfg := validCfg() + cfg.RedisCfg.Endpoint = "" + + err := cfg.Check() + require.Error(t, err) + }) } diff --git a/server/flags.go b/server/flags.go index 3e4b528..9046e3b 100644 --- a/server/flags.go +++ b/server/flags.go @@ -43,6 +43,7 @@ func init() { } type CLIConfig struct { + RedisCfg store.RedisConfig S3Config store.S3Config EigenDAConfig Config MetricsCfg opmetrics.CLIConfig @@ -51,6 +52,7 @@ type CLIConfig struct { func ReadCLIConfig(ctx *cli.Context) CLIConfig { config := ReadConfig(ctx) return CLIConfig{ + RedisCfg: config.RedisCfg, EigenDAConfig: config, MetricsCfg: opmetrics.ReadCLIConfig(ctx), S3Config: config.S3Config, diff --git a/server/load_store.go b/server/load_store.go index a323973..e73f09d 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -10,11 +10,41 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// populateTargets ... creates a list of storage backends based on the provided target strings +func populateTargets(targets []string, s3 *store.S3Store, redis *store.RedStore) []store.PrecomputedKeyStore { + stores := make([]store.PrecomputedKeyStore, len(targets)) + + for i, f := range targets { + b := store.StringToBackendType(f) + + switch b { + case store.Redis: + stores[i] = redis + + case store.S3: + stores[i] = s3 + + case store.EigenDA, store.Memory: + panic(fmt.Sprintf("Invalid target for fallback: %s", f)) + + case store.Unknown: + fallthrough + + default: + panic(fmt.Sprintf("Unknown fallback target: %s", f)) + } + } + + return stores +} + // LoadStoreRouter ... creates storage backend clients and instruments them into a storage routing abstraction func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.IRouter, error) { // create S3 backend store (if enabled) var err error var s3 *store.S3Store + var redis *store.RedStore + if cfg.S3Config.Bucket != "" && cfg.S3Config.Endpoint != "" { log.Info("Using S3 backend") s3, err = store.NewS3(cfg.S3Config) @@ -23,6 +53,15 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. } } + if cfg.RedisCfg.Endpoint != "" { + log.Info("Using Redis backend") + // create Redis backend store + redis, err = store.NewRedisStore(&cfg.RedisCfg) + if err != nil { + return nil, err + } + } + // create cert/data verification type daCfg := cfg.EigenDAConfig vCfg := daCfg.VerificationCfg() @@ -78,52 +117,8 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. } // determine read fallbacks - fallbacks := make([]store.PrecomputedKeyStore, len(cfg.EigenDAConfig.FallbackTargets)) - - for i, f := range cfg.EigenDAConfig.FallbackTargets { - b := store.StringToBackendType(f) - - switch b { - case store.S3: - fallbacks[i] = s3 - - case store.EigenDA, store.Memory: - return nil, fmt.Errorf("EigenDA cannot be used as a fallback target") - - case store.Redis: - return nil, fmt.Errorf("redis is not supported yet") - - case store.Unknown: - fallthrough - - default: - panic(fmt.Sprintf("Unknown fallback target: %s", f)) - } - } - - // determine caches for priority reads - caches := make([]store.PrecomputedKeyStore, len(cfg.EigenDAConfig.CacheTargets)) - - for i, f := range cfg.EigenDAConfig.CacheTargets { - b := store.StringToBackendType(f) - - switch b { - case store.S3: - caches[i] = s3 - - case store.EigenDA, store.Memory: - return nil, fmt.Errorf("EigenDA cannot be used as a cache target") - - case store.Redis: - return nil, fmt.Errorf("redis is not supported yet") - - case store.Unknown: - fallthrough - - default: - log.Warn("Unknown fallback target", "target", f) - } - } + fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3, redis) + caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3, redis) log.Info("Creating storage router", "eigenda backend type", eigenda != nil, "s3 backend type", s3 != nil) return store.NewRouter(eigenda, s3, log, caches, fallbacks) diff --git a/server/server.go b/server/server.go index e3cea9d..5c41c98 100644 --- a/server/server.go +++ b/server/server.go @@ -301,3 +301,21 @@ func (svr *Server) GetEigenDAStats() *store.Stats { func (svr *Server) GetS3Stats() *store.Stats { return svr.router.GetS3Store().Stats() } + +func (svr *Server) GetStoreStats(bt store.BackendType) (*store.Stats, error) { + // first check if the store is a cache + for _, cache := range svr.router.Caches() { + if cache.BackendType() == bt { + return cache.Stats(), nil + } + } + + // then check if the store is a fallback + for _, fallback := range svr.router.Fallbacks() { + if fallback.BackendType() == bt { + return fallback.Stats(), nil + } + } + + return nil, fmt.Errorf("store not found") +} diff --git a/store/redis.go b/store/redis.go new file mode 100644 index 0000000..4985698 --- /dev/null +++ b/store/redis.go @@ -0,0 +1,100 @@ +package store + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/go-redis/redis/v8" +) + +// RedisConfig ... user configurable +type RedisConfig struct { + Endpoint string + Password string + DB int + Eviction time.Duration + Profile bool +} + +// RedStore ... Redis storage backend implementation (This not safe for concurrent usage) +type RedStore struct { + eviction time.Duration + + client *redis.Client + + profile bool + reads int + entries int +} + +var _ PrecomputedKeyStore = (*RedStore)(nil) + +// NewRedisStore ... constructor +func NewRedisStore(cfg *RedisConfig) (*RedStore, error) { + client := redis.NewClient(&redis.Options{ + Addr: cfg.Endpoint, + Password: cfg.Password, + DB: cfg.DB, + }) + + // ensure server can be pinged using potential client connection + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + cmd := client.Ping(ctx) + if cmd.Err() != nil { + return nil, fmt.Errorf("failed to ping redis server: %w", cmd.Err()) + } + + return &RedStore{ + eviction: cfg.Eviction, + client: client, + profile: cfg.Profile, + reads: 0, + }, nil +} + +// Get ... retrieves a value from the Redis store. Returns nil if the key is not found vs. an error +// if the key is found but the value is not retrievable. +func (r *RedStore) Get(ctx context.Context, key []byte) ([]byte, error) { + value, err := r.client.Get(ctx, string(key)).Result() + if errors.Is(err, redis.Nil) { // key DNE + return nil, nil + } else if err != nil { + return nil, err + } + + if r.profile { + r.reads++ + } + + // cast value to byte slice + return []byte(value), nil +} + +// Put ... inserts a value into the Redis store +func (r *RedStore) Put(ctx context.Context, key []byte, value []byte) error { + err := r.client.Set(ctx, string(key), string(value), r.eviction).Err() + if err == nil && r.profile { + r.entries++ + } + + return err +} + +func (r *RedStore) Verify(_ []byte, _ []byte) error { + return nil +} + +func (r *RedStore) BackendType() BackendType { + return Redis +} + +func (r *RedStore) Stats() *Stats { + return &Stats{ + Entries: r.entries, + Reads: r.reads, + } +} diff --git a/store/router.go b/store/router.go index fe3a4a9..ad2805c 100644 --- a/store/router.go +++ b/store/router.go @@ -19,6 +19,8 @@ type IRouter interface { GetEigenDAStore() KeyGeneratedStore GetS3Store() PrecomputedKeyStore + Caches() []PrecomputedKeyStore + Fallbacks() []PrecomputedKeyStore } // Router ... storage backend routing layer @@ -80,6 +82,7 @@ func (r *Router) Get(ctx context.Context, key []byte, cm commitments.CommitmentM if err == nil { return data, nil } + r.log.Warn("Failed to read from cache targets", "err", err) } @@ -198,6 +201,12 @@ func (r *Router) multiSourceRead(ctx context.Context, commitment []byte, fallbac r.log.Warn("Failed to read from redundant target", "backend", src.BackendType(), "err", err) continue } + + if data == nil { + r.log.Debug("No data found in redundant target", "backend", src.BackendType()) + continue + } + // verify cert:data using EigenDA verification checks err = r.eigenda.Verify(commitment, data) if err != nil { @@ -251,3 +260,13 @@ func (r *Router) GetEigenDAStore() KeyGeneratedStore { func (r *Router) GetS3Store() PrecomputedKeyStore { return r.s3 } + +// Caches ... +func (r *Router) Caches() []PrecomputedKeyStore { + return r.caches +} + +// Fallbacks ... +func (r *Router) Fallbacks() []PrecomputedKeyStore { + return r.fallbacks +} diff --git a/store/store.go b/store/store.go index 50dc9d2..f1ad60a 100644 --- a/store/store.go +++ b/store/store.go @@ -1,6 +1,9 @@ package store -import "context" +import ( + "context" + "strings" +) type BackendType uint8 @@ -31,16 +34,18 @@ func (b BackendType) String() string { } func StringToBackendType(s string) BackendType { - switch s { - case "EigenDA": + lower := strings.ToLower(s) + + switch lower { + case "eigenda": return EigenDA - case "Memory": + case "memory": return Memory - case "S3": + case "s3": return S3 - case "Redis": + case "redis": return Redis - case "Unknown": + case "unknown": fallthrough default: return Unknown