Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional thread safety for parallel batch calls #94

Merged
merged 3 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions pkg/clients/ethereum/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,19 @@ type IndexedRpcRequestResponse struct {
Response *RPCResponse
}

type BatchedResponse struct {
Index int
Response *RPCResponse
}

// ChunkedBatchCall splits the requests into chunks of CHUNKED_BATCH_SIZE and sends them in parallel
// by calling the regular client.call method rather than relying on the batch call method.
//
// This function allows for better retry and error handling over the batch call method.
func (c *Client) ChunkedBatchCall(ctx context.Context, requests []*RPCRequest) ([]*RPCResponse, error) {
batches := [][]*IndexedRpcRequestResponse{}

// all requests in a flat list with their index stored
orderedRequestResponses := make([]*IndexedRpcRequestResponse, 0)
for i, req := range requests {
orderedRequestResponses = append(orderedRequestResponses, &IndexedRpcRequestResponse{
Expand All @@ -386,33 +392,48 @@ func (c *Client) ChunkedBatchCall(ctx context.Context, requests []*RPCRequest) (

// iterate over batches
for i, batch := range batches {
var wg sync.WaitGroup

// create waitGroup for the batch
wg := sync.WaitGroup{}
responses := make(chan BatchedResponse, len(batch))

c.Logger.Sugar().Debugw(fmt.Sprintf("[batch %d] Fetching batch", i),
zap.Int("batchRequests", len(batch)),
)

// iterate over requests in the current batch
// Iterate over requests in the current batch.
// For each batch, create a waitgroup for the go routines and a channel
// to capture the responses. Once all are complete, we can safely iterate
// over the responses and update the origin batch with the responses.
for j, req := range batch {
wg.Add(1)

go func(b *IndexedRpcRequestResponse) {
// capture loop variable to local scope
currentReq := req

go func() {
defer wg.Done()

res, err := c.Call(ctx, req.Request)
res, err := c.Call(ctx, currentReq.Request)
if err != nil {
c.Logger.Sugar().Errorw(fmt.Sprintf("[%d][%d]failed to batch call", i, j),
zap.Error(err),
zap.Any("request", req.Request),
)
return
}
b.Response = res
}(req)
responses <- BatchedResponse{
Index: currentReq.Index,
Response: res,
}
}()
}
wg.Wait()
close(responses)

// now we can safely iterate over the responses channel and update the batch
for response := range responses {
orderedRequestResponses[response.Index].Response = response.Response
}
}

allResults := []*RPCResponse{}
Expand Down
31 changes: 24 additions & 7 deletions pkg/contractCaller/sequentialContractCaller/contractCaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,48 @@ func (cc *SequentialContractCaller) GetOperatorRestakedStrategies(ctx context.Co

func (cc *SequentialContractCaller) getOperatorRestakedStrategiesBatch(ctx context.Context, operatorRestakedStrategies []*contractCaller.OperatorRestakedStrategy, blockNumber uint64) ([]*contractCaller.OperatorRestakedStrategy, error) {
var wg sync.WaitGroup
responses := make(chan *contractCaller.OperatorRestakedStrategy, len(operatorRestakedStrategies))

for _, operatorRestakedStrategy := range operatorRestakedStrategies {
wg.Add(1)
go func(ors *contractCaller.OperatorRestakedStrategy) {
// make a local copy of the entire struct
currentReq := *operatorRestakedStrategy
go func() {
defer wg.Done()
results, err := getOperatorRestakedStrategiesRetryable(ctx, ors.Avs, ors.Operator, blockNumber, cc.EthereumClient, cc.Logger)
results, err := getOperatorRestakedStrategiesRetryable(ctx, currentReq.Avs, currentReq.Operator, blockNumber, cc.EthereumClient, cc.Logger)
if err != nil {
cc.Logger.Sugar().Errorw("getOperatorRestakedStrategiesBatch - failed to get results",
zap.String("avs", ors.Avs),
zap.String("operator", ors.Operator),
zap.String("avs", currentReq.Avs),
zap.String("operator", currentReq.Operator),
zap.Uint64("blockNumber", blockNumber),
zap.Error(err),
)
return
}
ors.Results = results
}(operatorRestakedStrategy)
currentReq.Results = results

// send back a pointer to the copied struct
responses <- &currentReq
}()
}
wg.Wait()
return operatorRestakedStrategies, nil
close(responses)

allResponses := make([]*contractCaller.OperatorRestakedStrategy, 0)
for response := range responses {
allResponses = append(allResponses, response)
}
return allResponses, nil
}

const BATCH_SIZE = 25

func (cc *SequentialContractCaller) GetAllOperatorRestakedStrategies(ctx context.Context, operatorRestakedStrategies []*contractCaller.OperatorRestakedStrategy, blockNumber uint64) ([]*contractCaller.OperatorRestakedStrategy, error) {
cc.Logger.Sugar().Infow("SequentialContractCaller.GetAllOperatorRestakedStrategies",
zap.Int("total", len(operatorRestakedStrategies)),
zap.Uint64("blockNumber", blockNumber),
)

batches := make([][]*contractCaller.OperatorRestakedStrategy, 0)
currentIndex := 0
for {
Expand Down
6 changes: 5 additions & 1 deletion pkg/indexer/restakedStrategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (idx *Indexer) getAndInsertRestakedStrategies(
return err
}

idx.Logger.Sugar().Infow("Got operator restaked strategies",
zap.Int("count", len(results)),
zap.Uint64("blockNumber", blockNumber),
)
for _, result := range results {
avs := result.Avs
operator := result.Operator
Expand All @@ -102,7 +106,7 @@ func (idx *Indexer) getAndInsertRestakedStrategies(
)
return err
} else if err == nil {
idx.Logger.Sugar().Debugw("Inserted restaked strategy",
idx.Logger.Sugar().Infow("Inserted restaked strategy",
zap.String("restakedStrategy", restakedStrategy.String()),
zap.String("operator", operator),
zap.String("avs", avs),
Expand Down
Loading