Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader fragment #749

Merged
merged 8 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/clients/mock/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,22 @@ func (c *MockRetrievalClient) RetrieveBlob(
result := args.Get(0)
return result.([]byte), args.Error(1)
}

func (c *MockRetrievalClient) RetrieveBlobChunks(
ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) (*clients.BlobChunks, error) {

cody-littley marked this conversation as resolved.
Show resolved Hide resolved
args := c.Called(batchHeaderHash, blobIndex, referenceBlockNumber, batchRoot, quorumID)
return args.Get(0).(*clients.BlobChunks), args.Error(1)
}

func (c *MockRetrievalClient) CombineChunks(chunks *clients.BlobChunks) ([]byte, error) {
args := c.Called(chunks)

result := args.Get(0)
return result.([]byte), args.Error(1)
}
72 changes: 65 additions & 7 deletions api/clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,50 @@ import (
"context"
"errors"
"fmt"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/wealdtech/go-merkletree/v2"

"github.com/gammazero/workerpool"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
)

// RetrievalClient is an object that can retrieve blobs from the network.
type RetrievalClient interface {

// RetrieveBlob fetches a blob from the network. This method is equivalent to calling
// RetrieveBlobChunks to get the chunks and then CombineChunks to recombine those chunks into the original blob.
RetrieveBlob(
ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) ([]byte, error)

// RetrieveBlobChunks downloads the chunks of a blob from the network but do not recombine them. Use this method
// if detailed information about which node returned which chunk is needed. Otherwise, use RetrieveBlob.
RetrieveBlobChunks(
ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) (*BlobChunks, error)

// CombineChunks recombines the chunks into the original blob.
CombineChunks(chunks *BlobChunks) ([]byte, error)
}

// BlobChunks is a collection of chunks retrieved from the network which can be recombined into a blob.
type BlobChunks struct {
Chunks []*encoding.Frame
Indices []encoding.ChunkNumber
EncodingParams encoding.EncodingParams
BlobHeaderLength uint
Assignments map[core.OperatorID]core.Assignment
AssignmentInfo core.AssignmentInfo
}

type retrievalClient struct {
Expand All @@ -33,16 +59,14 @@ type retrievalClient struct {
numConnections int
}

var _ RetrievalClient = (*retrievalClient)(nil)

// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
nodeClient NodeClient,
verifier encoding.Verifier,
numConnections int,
) (*retrievalClient, error) {
numConnections int) (RetrievalClient, error) {

return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
Expand All @@ -54,13 +78,31 @@ func NewRetrievalClient(
}, nil
}

// RetrieveBlob retrieves a blob from the network.
func (r *retrievalClient) RetrieveBlob(
ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) ([]byte, error) {

chunks, err := r.RetrieveBlobChunks(ctx, batchHeaderHash, blobIndex, referenceBlockNumber, batchRoot, quorumID)
if err != nil {
return nil, err
}

return r.CombineChunks(chunks)
}

// RetrieveBlobChunks retrieves the chunks of a blob from the network but does not recombine them.
func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) (*BlobChunks, error) {

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
if err != nil {
return nil, err
Expand Down Expand Up @@ -173,5 +215,21 @@ func (r *retrievalClient) RetrieveBlob(
indices = append(indices, assignment.GetIndices()...)
}

return r.verifier.Decode(chunks, indices, encodingParams, uint64(blobHeader.Length)*encoding.BYTES_PER_SYMBOL)
return &BlobChunks{
Chunks: chunks,
Indices: indices,
EncodingParams: encodingParams,
BlobHeaderLength: blobHeader.Length,
Assignments: assignments,
AssignmentInfo: info,
}, nil
}

// CombineChunks recombines the chunks into the original blob.
func (r *retrievalClient) CombineChunks(chunks *BlobChunks) ([]byte, error) {
return r.verifier.Decode(
chunks.Chunks,
chunks.Indices,
chunks.EncodingParams,
uint64(chunks.BlobHeaderLength)*encoding.BYTES_PER_SYMBOL)
}
1 change: 0 additions & 1 deletion tools/traffic/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name),
ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name),
RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name),
ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name),
FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name),
RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name),
StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name),
Expand Down
8 changes: 0 additions & 8 deletions tools/traffic/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,6 @@ var (
Value: 3.0,
EnvVar: common.PrefixEnvVar(envPrefix, "REQUIRED_DOWNLOADS"),
}
ReadOverflowTableSizeFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "read-overflow-table-size"),
Usage: "Size of the overflow table for read requests.",
Required: false,
Value: 1024,
EnvVar: common.PrefixEnvVar(envPrefix, "READ_OVERFLOW_TABLE_SIZE"),
}
FetchBatchHeaderTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "fetch-batch-header-timeout"),
Usage: "Amount of time to wait for a batch header to be fetched.",
Expand Down Expand Up @@ -243,7 +236,6 @@ var optionalFlags = []cli.Flag{
FetchBatchHeaderTimeoutFlag,
RetrieveBlobChunksTimeoutFlag,
GetBlobStatusTimeoutFlag,
ReadOverflowTableSizeFlag,
WriteTimeoutFlag,
VerificationChannelCapacityFlag,
}
Expand Down
4 changes: 0 additions & 4 deletions tools/traffic/config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ type WorkerConfig struct {
// 0 or 1 times with the specified probability (e.g. 0.2 means each blob has a 20% chance of being downloaded).
// If greater than 1.0, then each blob will be downloaded the specified number of times.
RequiredDownloads float64
// The size of a table of blobs to optionally read when we run out of blobs that we are required to read. Blobs
// that are no longer required are added to this table, and when the table is at capacity they are randomly retired.
// Set this to 0 to disable this feature.
ReadOverflowTableSize uint
// The amount of time to wait for a batch header to be fetched.
FetchBatchHeaderTimeout time.Duration
// The amount of time to wait for a blob to be retrieved.
Expand Down
16 changes: 0 additions & 16 deletions tools/traffic/metrics/latency_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,6 @@ func (metric *latencyMetric) ReportLatency(latency time.Duration) {
metric.metrics.latency.WithLabelValues(metric.description).Observe(latency.Seconds())
}

// InvokeAndReportLatency performs an operation. If the operation does not produce an error, then the latency
// of the operation is reported to the metrics framework.
func InvokeAndReportLatency[T any](metric LatencyMetric, operation func() (T, error)) (T, error) {
start := time.Now()

t, err := operation()

if err == nil {
end := time.Now()
duration := end.Sub(start)
metric.ReportLatency(duration)
}

return t, err
}

// NewLatencyMetric creates a new prometheus collector for latency metrics.
func buildLatencyCollector(namespace string, registry *prometheus.Registry) *prometheus.SummaryVec {
return promauto.With(registry).NewSummaryVec(
Expand Down
Loading
Loading