Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: eigenda client returns 503 errors (for failover purpose) #828

Merged
merged 8 commits into from
Oct 30, 2024
2 changes: 2 additions & 0 deletions api/clients/codecs/default_blob_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func NewDefaultBlobCodec() DefaultBlobCodec {
return DefaultBlobCodec{}
}

// EncodeBlob can never return an error, but to maintain the interface it is included
// so that it can be swapped for the IFFTCodec without changing the interface
func (v DefaultBlobCodec) EncodeBlob(rawData []byte) ([]byte, error) {
codecBlobHeader := make([]byte, 32)
// first byte is always 0 to ensure the codecBlobHeader is a valid bn254 element
Expand Down
1 change: 1 addition & 0 deletions api/clients/codecs/ifft_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (v IFFTCodec) EncodeBlob(data []byte) ([]byte, error) {
var err error
data, err = v.writeCodec.EncodeBlob(data)
if err != nil {
// this cannot happen, because EncodeBlob never returns an error
return nil, fmt.Errorf("error encoding data: %w", err)
}

Expand Down
23 changes: 21 additions & 2 deletions api/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ type EigenDAClientConfig struct {
// TODO: we should change this param as its name is quite confusing
ResponseTimeout time.Duration

// The total amount of time that the client will spend waiting for EigenDA
// to "confirm" (include onchain) a blob after it has been dispersed. Note that
// we stick to "confirm" here but this really means InclusionTimeout,
// not confirmation in the sense of confirmation depth.
//
// If ConfirmationTimeout time passes and the blob is not yet confirmed,
// the client will return an api.ErrorFailover to let the caller failover to EthDA.
ConfirmationTimeout time.Duration

// The total amount of time that the client will spend waiting for EigenDA
// to confirm a blob after it has been dispersed
// Note that reasonable values for this field will depend on the value of WaitForFinalization.
Expand Down Expand Up @@ -81,15 +90,25 @@ func (c *EigenDAClientConfig) CheckAndSetDefaults() error {
return fmt.Errorf("EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.")
}

if c.ResponseTimeout == 0 {
c.ResponseTimeout = 30 * time.Second
}
if c.ConfirmationTimeout == 0 {
// batching interval on mainnet is 10 minutes,
// so we set the confirmation timeout to 15 minutes to give some buffer
c.ConfirmationTimeout = 15 * time.Minute
}
if c.StatusQueryRetryInterval == 0 {
c.StatusQueryRetryInterval = 5 * time.Second
}
if c.StatusQueryTimeout == 0 {
c.StatusQueryTimeout = 25 * time.Minute
}
if c.ResponseTimeout == 0 {
c.ResponseTimeout = 30 * time.Second
if c.ConfirmationTimeout > c.StatusQueryTimeout {
// doesn't make sense... confirmation is about onchain inclusion, whereas status query is about reaching finality (after inclusion)
return fmt.Errorf("EigenDAClientConfig.ConfirmationTimeout (%v) > EigenDAClientConfig.StatusQueryTimeout (%v)", c.ConfirmationTimeout, c.StatusQueryTimeout)
}

if len(c.SignerPrivateKeyHex) > 0 && len(c.SignerPrivateKeyHex) != 64 {
return fmt.Errorf("a valid length SignerPrivateKeyHex needs to have 64 bytes")
}
Expand Down
53 changes: 30 additions & 23 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package clients
import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -42,6 +41,8 @@ func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag b
type DisperserClient interface {
Close() error
DisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
// DisperseBlobAuthenticated disperses a blob with an authenticated request.
// The BlobStatus returned will always be PROCESSSING if error is nil.
DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
DispersePaidBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
Expand Down Expand Up @@ -112,7 +113,7 @@ func (c *disperserClient) Close() error {
func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, nil, fmt.Errorf("error initializing connection: %w", err)
return nil, nil, api.NewErrorFailover(err)
}

ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout)
Expand Down Expand Up @@ -154,17 +155,17 @@ func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quo
func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, nil, fmt.Errorf("error initializing connection: %w", err)
return nil, nil, api.NewErrorFailover(err)
}

if c.signer == nil {
return nil, nil, fmt.Errorf("uninitialized signer for authenticated dispersal")
return nil, nil, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
}

// first check if signer is valid
accountId, err := c.signer.GetAccountID()
if err != nil {
return nil, nil, fmt.Errorf("please configure signer key if you want to use authenticated endpoint %w", err)
return nil, nil, api.NewErrorInvalidArg(fmt.Sprintf("please configure signer key if you want to use authenticated endpoint %v", err))
}

quorumNumbers := make([]uint32, len(quorums))
Expand All @@ -175,7 +176,10 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
// check every 32 bytes of data are within the valid range for a bn254 field element
_, err = rs.ToFrArray(data)
if err != nil {
return nil, nil, fmt.Errorf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617, %w", err)
return nil, nil, api.NewErrorInvalidArg(
fmt.Sprintf("encountered an error to convert a 32-bytes into a valid field element, "+
"please use the correct format where every 32bytes(big-endian) is less than "+
"21888242871839275222246405745257275088548364400416034343698204186575808495617, %v", err))
}

request := &disperser_rpc.DisperseBlobRequest{
Expand All @@ -185,19 +189,19 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
}

ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout)

defer cancel()

stream, err := c.client.DisperseBlobAuthenticated(ctxTimeout)
if err != nil {
// grpc client errors return grpc errors, so we can just wrap the error in a normal wrapError,
// no need to wrap in another grpc error as we do with other errors above.
return nil, nil, fmt.Errorf("error while calling DisperseBlobAuthenticated: %w", err)
}

// Send the initial request
err = stream.Send(&disperser_rpc.AuthenticatedRequest{Payload: &disperser_rpc.AuthenticatedRequest_DisperseRequest{
DisperseRequest: request,
}})

if err != nil {
return nil, nil, fmt.Errorf("failed to send request: %w", err)
}
Expand All @@ -209,18 +213,17 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
}
authHeaderReply, ok := reply.Payload.(*disperser_rpc.AuthenticatedReply_BlobAuthHeader)
if !ok {
return nil, nil, errors.New("expected challenge")
return nil, nil, api.NewErrorInternal(fmt.Sprintf("client expected challenge from disperser, instead received: %v", reply))
}

authHeader := core.BlobAuthHeader{
BlobCommitments: encoding.BlobCommitments{},
AccountID: "",
Nonce: authHeaderReply.BlobAuthHeader.ChallengeParameter,
}

authData, err := c.signer.SignBlobRequest(authHeader)
if err != nil {
return nil, nil, errors.New("error signing blob request")
return nil, nil, api.NewErrorInternal(fmt.Sprintf("error signing blob request: %v", err))
}

// Process challenge and send back challenge_reply
Expand All @@ -239,12 +242,17 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
}
disperseReply, ok := reply.Payload.(*disperser_rpc.AuthenticatedReply_DisperseReply) // Process the final disperse_reply
if !ok {
return nil, nil, errors.New("expected DisperseReply")
return nil, nil, api.NewErrorInternal(fmt.Sprintf("client expected DisperseReply from disperser, instead received: %v", reply))
}

blobStatus, err := disperser.FromBlobStatusProto(disperseReply.DisperseReply.GetResult())
if err != nil {
return nil, nil, err
return nil, nil, api.NewErrorInternal(fmt.Sprintf("parsing blob status: %v", err))
}

// Assert: only status that makes sense is processing. Anything else is a bug on disperser side.
if *blobStatus != disperser.Processing {
return nil, nil, api.NewErrorInternal(fmt.Sprintf("expected status to be Processing, got %v", *blobStatus))
}

return blobStatus, disperseReply.DisperseReply.GetRequestId(), nil
Expand All @@ -253,7 +261,7 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) (*disperser_rpc.BlobStatusReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, fmt.Errorf("error initializing connection: %w", err)
return nil, api.NewErrorInternal(err.Error())
}

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
Expand All @@ -262,19 +270,13 @@ func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) (
request := &disperser_rpc.BlobStatusRequest{
RequestId: requestID,
}

reply, err := c.client.GetBlobStatus(ctxTimeout, request)
if err != nil {
return nil, err
}

return reply, nil
return c.client.GetBlobStatus(ctxTimeout, request)
}

func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, fmt.Errorf("error initializing connection: %w", err)
return nil, api.NewErrorInternal(err.Error())
}

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
Expand All @@ -289,6 +291,8 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by
return reply.Data, nil
}

// initOnceGrpcConnection initializes the grpc connection and client if they are not already initialized.
// If initialization fails, it caches the error and will return it on every subsequent call.
func (c *disperserClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
Expand All @@ -302,7 +306,10 @@ func (c *disperserClient) initOnceGrpcConnection() error {
c.conn = conn
c.client = disperser_rpc.NewDisperserClient(conn)
})
return initErr
if initErr != nil {
return fmt.Errorf("initializing grpc connection: %w", initErr)
}
return nil
}

func getGrpcDialOptions(useSecureGrpcFlag bool) []grpc.DialOption {
Expand Down
27 changes: 27 additions & 0 deletions api/clients/disperser_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package clients_test

import (
"context"
"testing"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestPutBlobNoopSigner(t *testing.T) {
config := clients.NewConfig("nohost", "noport", time.Second, false)
disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner())

test := []byte("test")
test[0] = 0x00 // make sure the first byte of the requst is always 0
quorums := []uint8{0}
_, _, err := disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums)
st, isGRPCError := status.FromError(err)
assert.True(t, isGRPCError)
assert.Equal(t, codes.InvalidArgument.String(), st.Code().String())
assert.Equal(t, "please configure signer key if you want to use authenticated endpoint noop signer cannot get accountID", st.Message())
}
Loading
Loading