Skip to content

Commit

Permalink
disperser api server v2
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Oct 8, 2024
1 parent 5323e99 commit 15fa064
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 3 deletions.
6 changes: 4 additions & 2 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ import (
)

var (
queue disperser.BlobStore
dispersalServer *apiserver.DispersalServer
queue disperser.BlobStore
dispersalServer *apiserver.DispersalServer
dispersalServerV2 *apiserver.DispersalServerV2

dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
Expand Down Expand Up @@ -605,6 +606,7 @@ func setup() {
transactor.On("GetRequiredQuorumNumbers", tmock.Anything).Return([]uint8{}, nil)

dispersalServer = newTestServer(transactor)
dispersalServerV2 = newTestServerV2()
}

func teardown() {
Expand Down
76 changes: 76 additions & 0 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package apiserver

import (
"context"
"errors"
"fmt"
"net"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
)

type DispersalServerV2 struct {
pb.UnimplementedDisperserServer

serverConfig disperser.ServerConfig
logger logging.Logger
}

// NewDispersalServerV2 creates a new Server struct with the provided parameters.
func NewDispersalServerV2(
serverConfig disperser.ServerConfig,
_logger logging.Logger,
) *DispersalServerV2 {
logger := _logger.With("component", "DispersalServerV2")

return &DispersalServerV2{
serverConfig: serverConfig,
logger: logger,
}
}

func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) {
return &pb.DisperseBlobReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented")
}

func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) {
return &pb.BlobStatusReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented")
}

func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobCommitmentRequest) (*pb.BlobCommitmentReply, error) {
return &pb.BlobCommitmentReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented")
}

func (s *DispersalServerV2) Start(ctx context.Context) error {
// Serve grpc requests
addr := fmt.Sprintf("%s:%s", disperser.Localhost, s.serverConfig.GrpcPort)
listener, err := net.Listen("tcp", addr)
if err != nil {
return errors.New("could not start tcp listener")
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB

gs := grpc.NewServer(opt)
reflection.Register(gs)
pb.RegisterDisperserServer(gs, s)

// Register Server for Health Checks
name := pb.Disperser_ServiceDesc.ServiceName
healthcheck.RegisterHealthServer(name, gs)

s.logger.Info("GRPC Listening", "port", s.serverConfig.GrpcPort, "address", listener.Addr().String())

if err := gs.Serve(listener); err != nil {
return errors.New("could not start GRPC server")
}

return nil
}
51 changes: 51 additions & 0 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package apiserver_test

import (
"context"
"crypto/rand"
"testing"
"time"

"github.com/Layr-Labs/eigenda/disperser/apiserver"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigensdk-go/logging"

pbv2 "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/stretchr/testify/assert"
)

func TestV2DisperseBlob(t *testing.T) {
data := make([]byte, 3*1024)
_, err := rand.Read(data)
assert.NoError(t, err)

data = codec.ConvertByPaddingEmptyByte(data)
_, err = dispersalServerV2.DisperseBlob(context.Background(), &pbv2.DisperseBlobRequest{
Data: data,
BlobHeader: &pbv2.BlobHeader{},
})
assert.ErrorContains(t, err, "not implemented")
}

func TestV2GetBlobStatus(t *testing.T) {
_, err := dispersalServerV2.GetBlobStatus(context.Background(), &pbv2.BlobStatusRequest{
BlobKey: []byte{1},
})
assert.ErrorContains(t, err, "not implemented")
}

func TestV2GetBlobCommitment(t *testing.T) {
_, err := dispersalServerV2.GetBlobCommitment(context.Background(), &pbv2.BlobCommitmentRequest{
Data: []byte{1},
})
assert.ErrorContains(t, err, "not implemented")
}

func newTestServerV2() *apiserver.DispersalServerV2 {
logger := logging.NewNoopLogger()
return apiserver.NewDispersalServerV2(disperser.ServerConfig{
GrpcPort: "51002",
GrpcTimeout: 1 * time.Second,
}, logger)
}
17 changes: 16 additions & 1 deletion disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"fmt"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
Expand All @@ -12,7 +14,15 @@ import (
"github.com/urfave/cli"
)

type DisperserVersion uint

const (
V1 DisperserVersion = 1
V2 DisperserVersion = 2
)

type Config struct {
DisperserVersion DisperserVersion
AwsClientConfig aws.ClientConfig
BlobstoreConfig blobstore.Config
ServerConfig disperser.ServerConfig
Expand All @@ -32,6 +42,10 @@ type Config struct {
}

func NewConfig(ctx *cli.Context) (Config, error) {
version := ctx.GlobalUint(flags.DisperserVersionFlag.Name)
if version != uint(V1) && version != uint(V2) {
return Config{}, fmt.Errorf("unknown disperser version %d", version)
}

ratelimiterConfig, err := ratelimit.ReadCLIConfig(ctx, flags.FlagPrefix)
if err != nil {
Expand All @@ -49,7 +63,8 @@ func NewConfig(ctx *cli.Context) (Config, error) {
}

config := Config{
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
DisperserVersion: DisperserVersion(version),
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
ServerConfig: disperser.ServerConfig{
GrpcPort: ctx.GlobalString(flags.GrpcPortFlag.Name),
GrpcTimeout: ctx.GlobalDuration(flags.GrpcTimeoutFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/apiserver/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_SERVICE_MANAGER"),
}
/* Optional Flags*/
DisperserVersionFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "disperser-version"),
Usage: "Disperser version. Options are 1 and 2.",
Required: false,
Value: 1,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPERSER_VERSION"),
}
MetricsHTTPPort = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"),
Usage: "the http port which the metrics prometheus server is listening",
Expand Down Expand Up @@ -113,6 +120,7 @@ var requiredFlags = []cli.Flag{
}

var optionalFlags = []cli.Flag{
DisperserVersionFlag,
MetricsHTTPPort,
EnableMetrics,
EnableRatelimiter,
Expand Down
5 changes: 5 additions & 0 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func RunDisperserServer(ctx *cli.Context) error {
return err
}

if config.DisperserVersion == V2 {
server := apiserver.NewDispersalServerV2(config.ServerConfig, logger)
return server.Start(context.Background())
}

client, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.Address{}, logger)
if err != nil {
logger.Error("Cannot create chain.Client", "err", err)
Expand Down

0 comments on commit 15fa064

Please sign in to comment.