diff --git a/docker-compose.yaml b/docker-compose.yaml index 559a0f54..7416ed8f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,22 +1,82 @@ +## The following is a proxy instance +## pointed to redis for storage caching and S3 +## for storage failovers + services: + ## Used as secondary read failover target + minio: + image: minio/minio:latest + container_name: minio + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + ports: + - "9000:9000" + - "9001:9001" + command: server /data + volumes: + - minio_data:/data + + minio-init: + ## Seed test bucket + image: minio/mc:latest + depends_on: + - minio + entrypoint: ["/bin/sh", "-c", "/usr/bin/create-bucket.sh"] + volumes: + - ./scripts/create-test-s3-bucket.sh:/usr/bin/create-bucket.sh + + redis: + image: redis:latest + container_name: redis + command: redis-server --requirepass redispassword + environment: + - REDIS_PASSWORD=redispassword + ports: + - "6379:6379" + eigenda_proxy: + depends_on: + - minio-init build: context: . dockerfile: Dockerfile container_name: eigenda-proxy environment: + - EIGENDA_PROXY_LOG_LEVEL=debug - EIGENDA_PROXY_ADDR=0.0.0.0 - EIGENDA_PROXY_PORT=4242 - - EIGENDA_PROXY_MEMSTORE_ENABLED=false + ## Turn this off to talk to actual eigenda network + - EIGENDA_PROXY_MEMSTORE_ENABLED=true - EIGENDA_PROXY_MEMSTORE_EXPIRATION=45m - - EIGENDA_PROXY_EIGENDA_SIGNER_PRIVATE_KEY_HEX=$PRIVATE_KEY + - EIGENDA_PROXY_EIGENDA_CERT_VERIFICATION_DISABLED=true + - EIGENDA_PROXY_EIGENDA_SIGNER_PRIVATE_KEY_HEX=${PRIVATE_KEY} - EIGENDA_PROXY_EIGENDA_DISPERSER_RPC=disperser-holesky.eigenda.xyz:443 - EIGENDA_PROXY_EIGENDA_SERVICE_MANAGER_ADDR=0xD4A7E1Bd8015057293f0D0A557088c286942e84b - - EIGENDA_PROXY_EIGENDA_ETH_RPC=$ETH_RPC + - EIGENDA_PROXY_EIGENDA_ETH_RPC=https://ethereum-holesky-rpc.publicnode.com - EIGENDA_PROXY_EIGENDA_ETH_CONFIRMATION_DEPTH=0 - EIGENDA_PROXY_METRICS_ADDR=0.0.0.0 - EIGENDA_PROXY_METRICS_ENABLED=true - EIGENDA_PROXY_METRICS_PORT=7300 + ## S3 + - EIGENDA_PROXY_S3_CREDENTIAL_TYPE=static + - EIGENDA_PROXY_S3_ACCESS_KEY_ID=minioadmin + - EIGENDA_PROXY_S3_ACCESS_KEY_SECRET=minioadmin + - EIGENDA_PROXY_S3_BUCKET=eigenda-proxy-test + - EIGENDA_PROXY_S3_PATH="" + - EIGENDA_PROXY_S3_ENDPOINT=minio:9000 + - EIGENDA_PROXY_S3_ENABLE_TLS=false + + # Redis Configuration + - EIGENDA_PROXY_REDIS_DB=0 + - EIGENDA_PROXY_REDIS_ENDPOINT=redis:6379 + - EIGENDA_PROXY_REDIS_PASSWORD=redispassword + - EIGENDA_PROXY_REDIS_EVICTION=24h0m0s + + ## Secondary routing + - EIGENDA_PROXY_STORAGE_FALLBACK_TARGETS=s3 + - EIGENDA_PROXY_STORAGE_CACHE_TARGETS=redis + ports: - 4242:4242 - 7300:7300 @@ -44,14 +104,6 @@ services: depends_on: - prometheus - traffic-generator: - image: alpine:latest - build: scripts/ - container_name: traffic_generator - depends_on: - - eigenda_proxy - volumes: - - ./scripts/:/scripts/ - volumes: grafana-data: + minio_data: \ No newline at end of file diff --git a/scripts/create-test-s3-bucket.sh b/scripts/create-test-s3-bucket.sh new file mode 100755 index 00000000..3d850fd7 --- /dev/null +++ b/scripts/create-test-s3-bucket.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +# Wait 2 seconds to ensure minio is finished bootstrapping +# TODO: Update this to do event based polling on minio server directly vs semi-arbitrary timeout +sleep 2s + +# Configure MinIO client (mc) +echo "Configuring MinIO client..." +mc alias set local http://minio:9000 minioadmin minioadmin + +# Ensure the bucket exists +echo "Creating bucket: eigenda-proxy-test..." +mc mb local/eigenda-proxy-test || echo "Bucket already exists." + +echo "Bucket setup complete." diff --git a/server/load_store.go b/server/load_store.go index cb75a6ae..befe5e0a 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -121,7 +121,7 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr return nil, err } - // create secondary storage router + // create secondary storage manager fallbacks := populateTargets(cfg.EigenDAConfig.StorageConfig.FallbackTargets, s3Store, redisStore) caches := populateTargets(cfg.EigenDAConfig.StorageConfig.CacheTargets, s3Store, redisStore) secondary := store.NewSecondaryManager(log, m, caches, fallbacks) @@ -135,6 +135,10 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr } } - log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) + log.Info("Created storage backends", + "eigenda", eigenDA != nil, + "s3", s3Store != nil, + "redis", redisStore != nil, + ) return store.NewManager(eigenDA, s3Store, log, secondary) } diff --git a/store/generated_key/memstore/memstore.go b/store/generated_key/memstore/memstore.go index dce04571..59cf18cb 100644 --- a/store/generated_key/memstore/memstore.go +++ b/store/generated_key/memstore/memstore.go @@ -83,7 +83,6 @@ func (e *MemStore) pruningLoop(ctx context.Context) { return case <-timer.C: - e.l.Debug("pruning expired blobs") e.pruneExpired() } } @@ -99,7 +98,7 @@ func (e *MemStore) pruneExpired() { delete(e.keyStarts, commit) delete(e.store, commit) - e.l.Info("blob pruned", "commit", commit) + e.l.Debug("blob pruned", "commit", commit) } } } diff --git a/store/manager.go b/store/manager.go index 93721d89..fe831f72 100644 --- a/store/manager.go +++ b/store/manager.go @@ -129,11 +129,13 @@ func (m *Manager) Put(ctx context.Context, cm commitments.CommitmentMode, key, v // 2 - Put blob into secondary storage backends if m.secondary.Enabled() && m.secondary.AsyncWriteEntry() { // publish put notification to secondary's subscription on PutNotify topic + m.log.Debug("Publishing data to async secondary stores") m.secondary.Topic() <- PutNotify{ Commitment: commit, Value: value, } } else if m.secondary.Enabled() && !m.secondary.AsyncWriteEntry() { // secondary is available only for synchronous writes + m.log.Debug("Publishing data to single threaded secondary stores") err := m.secondary.HandleRedundantWrites(ctx, commit, value) if err != nil { m.log.Error("Secondary insertions failed", "error", err.Error()) diff --git a/store/secondary.go b/store/secondary.go index 9814095c..82251648 100644 --- a/store/secondary.go +++ b/store/secondary.go @@ -33,7 +33,7 @@ type ISecondary interface { WriteSubscriptionLoop(ctx context.Context) } -// PutNotify ... notification received by primary router to perform insertion across +// PutNotify ... notification received by primary manager to perform insertion across // secondary storage backends type PutNotify struct { Commitment []byte @@ -53,7 +53,7 @@ type SecondaryManager struct { concurrentWrites bool } -// NewSecondaryManager ... creates a new secondary storage router +// NewSecondaryManager ... creates a new secondary storage manager func NewSecondaryManager(log log.Logger, m metrics.Metricer, caches []common.PrecomputedKeyStore, fallbacks []common.PrecomputedKeyStore) ISecondary { return &SecondaryManager{ topic: make(chan PutNotify), // channel is un-buffered which dispersing consumption across routines helps alleviate @@ -92,6 +92,7 @@ func (sm *SecondaryManager) HandleRedundantWrites(ctx context.Context, commitmen successes := 0 for _, src := range sources { + sm.log.Debug("Attempting to write to secondary storage", "backend", src.BackendType()) cb := sm.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodPut) // for added safety - we retry the insertion 5x using a default exponential backoff @@ -115,12 +116,12 @@ func (sm *SecondaryManager) HandleRedundantWrites(ctx context.Context, commitmen return nil } -// AsyncWriteEntry ... subscribes to put notifications posted to shared topic with primary router +// AsyncWriteEntry ... subscribes to put notifications posted to shared topic with primary manager func (sm *SecondaryManager) AsyncWriteEntry() bool { return sm.concurrentWrites } -// WriteSubscriptionLoop ... subscribes to put notifications posted to shared topic with primary router +// WriteSubscriptionLoop ... subscribes to put notifications posted to shared topic with primary manager func (sm *SecondaryManager) WriteSubscriptionLoop(ctx context.Context) { sm.concurrentWrites = true