-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
arbitrum nitro support: implement the jsonrpc client logic to get required data #67
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,15 @@ | ||
package jsonrpc | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"encoding/json" | ||
"fmt" | ||
"log/slog" | ||
"time" | ||
|
||
"github.com/duneanalytics/blockchain-ingester/models" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
type ArbitrumNitroClient struct { | ||
|
@@ -30,15 +33,107 @@ func NewArbitrumNitroClient(log *slog.Logger, cfg Config) (*ArbitrumNitroClient, | |
// TODO: this method should be optional | ||
// 2. call to eth_getTransactionReceipt for each Tx present in the Block | ||
// | ||
// We encode the payload in NDJSON, and use a header line to indicate how many Tx are present in the block | ||
func (c *ArbitrumNitroClient) BlockByNumber(_ context.Context, blockNumber int64) (models.RPCBlock, error) { | ||
// We encode the payload in NDJSON | ||
func (c *ArbitrumNitroClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) { | ||
tStart := time.Now() | ||
defer func() { | ||
c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "duration", time.Since(tStart)) | ||
}() | ||
// TODO: lets not implement this yet | ||
return models.RPCBlock{ | ||
BlockNumber: blockNumber, | ||
Error: errors.New("not implemented"), | ||
}, errors.New("not implemented") | ||
|
||
blockNumberHex := fmt.Sprintf("0x%x", blockNumber) | ||
|
||
methods := []string{ | ||
"eth_getBlockByNumber", | ||
"debug_traceBlockByNumber", | ||
} | ||
methodArgs := map[string][]any{ | ||
"eth_getBlockByNumber": {blockNumberHex, true}, | ||
"debug_traceBlockByNumber": {blockNumberHex, map[string]string{"tracer": "callTracer"}}, | ||
} | ||
group, groupCtx := errgroup.WithContext(ctx) | ||
results := make([]*bytes.Buffer, len(methods)) | ||
for i, method := range methods { | ||
results[i] = c.bufPool.Get().(*bytes.Buffer) | ||
defer c.putBuffer(results[i]) | ||
|
||
group.Go(func() error { | ||
errCh := make(chan error, 1) | ||
c.wrkPool.Submit(func() { | ||
defer close(errCh) | ||
err := c.getResponseBody(groupCtx, method, methodArgs[method], results[i]) | ||
if err != nil { | ||
c.log.Error("Failed to get response for jsonRPC", | ||
"blockNumber", blockNumber, | ||
"method", method, | ||
"error", err, | ||
) | ||
errCh <- err | ||
} else { | ||
errCh <- nil | ||
} | ||
}) | ||
return <-errCh | ||
}) | ||
} | ||
|
||
if err := group.Wait(); err != nil { | ||
return models.RPCBlock{}, err | ||
} | ||
|
||
txHashes, err := getTransactionHashes(results[0].Bytes()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we start working on getting the transactions once we complete all methods from above ( |
||
if err != nil { | ||
return models.RPCBlock{}, err | ||
} | ||
|
||
c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "txCount", len(txHashes)) | ||
group, groupCtx = errgroup.WithContext(ctx) | ||
for _, tx := range txHashes { | ||
result := c.bufPool.Get().(*bytes.Buffer) | ||
defer c.putBuffer(result) | ||
|
||
results = append(results, result) | ||
group.Go(func() error { | ||
errCh := make(chan error, 1) | ||
c.wrkPool.Submit(func() { | ||
defer close(errCh) | ||
err := c.getResponseBody(groupCtx, "eth_getTransactionReceipt", []any{tx.Hash}, result) | ||
if err != nil { | ||
c.log.Error("Failed to get response for jsonRPC", | ||
"blockNumber", blockNumber, | ||
"method", "eth_getTransactionReceipt", | ||
"txHash", tx.Hash, | ||
"error", err, | ||
) | ||
errCh <- err | ||
} else { | ||
errCh <- nil | ||
} | ||
}) | ||
return <-errCh | ||
}) | ||
} | ||
if err := group.Wait(); err != nil { | ||
return models.RPCBlock{}, err | ||
} | ||
|
||
return c.buildRPCBlockResponse(blockNumber, results) | ||
} | ||
|
||
type transactionHash struct { | ||
Hash string `json:"hash"` | ||
} | ||
|
||
func getTransactionHashes(blockResp []byte) ([]transactionHash, error) { | ||
// minimal parse the block response to extract the transaction hashes | ||
type blockResponse struct { | ||
Result struct { | ||
Transactions []transactionHash `json:"transactions"` | ||
} `json:"result"` | ||
} | ||
var resp blockResponse | ||
err := json.Unmarshal(blockResp, &resp) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse eth_getBlockByNumber response: %w", err) | ||
} | ||
return resp.Result.Transactions, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,8 +56,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m | |
results := make([]*bytes.Buffer, len(methods)) | ||
for i, method := range methods { | ||
results[i] = c.bufPool.Get().(*bytes.Buffer) | ||
defer c.bufPool.Put(results[i]) | ||
results[i].Reset() | ||
defer c.putBuffer(results[i]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know if this makes any difference, but here you also defering the |
||
|
||
group.Go(func() error { | ||
errCh := make(chan error, 1) | ||
|
@@ -66,6 +65,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m | |
err := c.getResponseBody(ctx, method, methodArgs[method], results[i]) | ||
if err != nil { | ||
c.log.Error("Failed to get response for jsonRPC", | ||
"blockNumber", blockNumber, | ||
"method", method, | ||
"error", err, | ||
) | ||
|
@@ -82,14 +82,5 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m | |
return models.RPCBlock{}, err | ||
} | ||
|
||
// copy the responses in order | ||
var buffer bytes.Buffer | ||
for _, res := range results { | ||
buffer.Grow(res.Len()) | ||
buffer.ReadFrom(res) | ||
} | ||
return models.RPCBlock{ | ||
BlockNumber: blockNumber, | ||
Payload: buffer.Bytes(), | ||
}, nil | ||
return c.buildRPCBlockResponse(blockNumber, results) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love us to have something like RxGo (but maintained) or go-streams or similar to make some of this channel + errgroup + ant stuff simpler. Even if we wrote a small utility function that covered the basic pattern we keep using.