Skip to content

Commit

Permalink
Modify SubmitBlock to SubmitBlocks in DA interace (cosmos#1083)
Browse files Browse the repository at this point in the history
<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview

Resolves: cosmos#1080, cosmos#1084 

Adding DA Testing PR WIP: cosmos#1086 
<!-- 
Please provide an explanation of the PR, including the appropriate
context,
background, goal, and rationale. If there is an issue with this
information,
please provide a tl;dr and link the issue. 
-->

## Checklist

<!-- 
Please complete the checklist to ensure that the PR is ready to be
reviewed.

IMPORTANT:
PRs should be left in Draft until the below checklist is completed.
-->

- [x] New and updated code has appropriate documentation
- [ ] New and updated code has new and/or updated testing - 1086 adds
this
- [x] Required CI checks are passing
- [x] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords
  • Loading branch information
Manav-Aggarwal authored Jul 19, 2023
1 parent f40c7c6 commit a836166
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 176 deletions.
2 changes: 1 addition & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error
submitted := false
backoff := initialBackoff
for attempt := 1; ctx.Err() == nil && !submitted && attempt <= maxSubmitAttempts; attempt++ {
res := m.dalc.SubmitBlock(ctx, block)
res := m.dalc.SubmitBlocks(ctx, []*types.Block{block})
if res.Code == da.StatusSuccess {
m.logger.Info("successfully submitted Rollkit block to DA layer", "rollkitHeight", block.SignedHeader.Header.Height(), "daHeight", res.DAHeight)
submitted = true
Expand Down
47 changes: 24 additions & 23 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,34 @@ func (c *DataAvailabilityLayerClient) Stop() error {
return nil
}

// SubmitBlock submits a block to DA layer.
func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
data, err := block.MarshalBinary()
if err != nil {
return da.ResultSubmitBlock{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
// SubmitBlocks submits blocks to DA layer.
func (c *DataAvailabilityLayerClient) SubmitBlocks(ctx context.Context, blocks []*types.Block) da.ResultSubmitBlocks {
blobs := make([]*blob.Blob, len(blocks))
for blockIndex, block := range blocks {
data, err := block.MarshalBinary()
if err != nil {
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}
}

blockBlob, err := blob.NewBlobV0(c.namespace.Bytes(), data)
if err != nil {
return da.ResultSubmitBlock{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
blockBlob, err := blob.NewBlobV0(c.namespace.Bytes(), data)
if err != nil {
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}
blobs[blockIndex] = blockBlob
}

blobs := []*blob.Blob{blockBlob}

txResponse, err := c.rpc.State.SubmitPayForBlob(ctx, math.NewInt(c.config.Fee), c.config.GasLimit, blobs)
if err != nil {
return da.ResultSubmitBlock{
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Expand All @@ -113,15 +114,15 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty
"daHeight", txResponse.Height, "daTxHash", txResponse.TxHash)

if txResponse.Code != 0 {
return da.ResultSubmitBlock{
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Sprintf("Codespace: '%s', Code: %d, Message: %s", txResponse.Codespace, txResponse.Code, txResponse.RawLog),
},
}
}

return da.ResultSubmitBlock{
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "tx hash: " + txResponse.TxHash,
Expand Down
2 changes: 1 addition & 1 deletion da/celestia/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *Server) rpc(w http.ResponseWriter, r *http.Request) {
return
}

res := s.mock.SubmitBlock(r.Context(), &block)
res := s.mock.SubmitBlocks(r.Context(), []*types.Block{&block})
resp := &response{
Jsonrpc: "2.0",
Result: &sdk.TxResponse{
Expand Down
8 changes: 4 additions & 4 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type BaseResult struct {
DAHeight uint64
}

// ResultSubmitBlock contains information returned from DA layer after block submission.
type ResultSubmitBlock struct {
// ResultSubmitBlocks contains information returned from DA layer after blocks submission.
type ResultSubmitBlocks struct {
BaseResult
// Not sure if this needs to be bubbled up to other
// parts of Rollkit.
Expand Down Expand Up @@ -79,10 +79,10 @@ type DataAvailabilityLayerClient interface {
// Stop is called once, when DataAvailabilityLayerClient is no longer needed.
Stop() error

// SubmitBlock submits the passed in block to the DA layer.
// SubmitBlocks submits the passed in blocks to the DA layer.
// This should create a transaction which (potentially)
// triggers a state transition in the DA layer.
SubmitBlock(ctx context.Context, block *types.Block) ResultSubmitBlock
SubmitBlocks(ctx context.Context, blocks []*types.Block) ResultSubmitBlocks
}

// BlockRetriever is additional interface that can be implemented by Data Availability Layer Client that is able to retrieve
Expand Down
25 changes: 16 additions & 9 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rollkit/rollkit/log"
"github.com/rollkit/rollkit/types"
"github.com/rollkit/rollkit/types/pb/dalc"
"github.com/rollkit/rollkit/types/pb/rollkit"
)

// DataAvailabilityLayerClient is a generic client that proxies all DA requests via gRPC.
Expand Down Expand Up @@ -74,21 +75,27 @@ func (d *DataAvailabilityLayerClient) Stop() error {
return d.conn.Close()
}

// SubmitBlock proxies SubmitBlock request to gRPC server.
func (d *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
bp, err := block.ToProto()
if err != nil {
return da.ResultSubmitBlock{
BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()},
// SubmitBlocks proxies SubmitBlocks request to gRPC server.
func (d *DataAvailabilityLayerClient) SubmitBlocks(ctx context.Context, blocks []*types.Block) da.ResultSubmitBlocks {
bps := make([]*rollkit.Block, len(blocks))
// convert blocks to protobuf
for i, block := range blocks {
bp, err := block.ToProto()
if err != nil {
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()},
}
}
bps[i] = bp
}
resp, err := d.client.SubmitBlock(ctx, &dalc.SubmitBlockRequest{Block: bp})

resp, err := d.client.SubmitBlocks(ctx, &dalc.SubmitBlocksRequest{Blocks: bps})
if err != nil {
return da.ResultSubmitBlock{
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()},
}
}
return da.ResultSubmitBlock{
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
Expand Down
19 changes: 12 additions & 7 deletions da/grpc/mockserv/mockserv.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ type mockImpl struct {
mock mock.DataAvailabilityLayerClient
}

func (m *mockImpl) SubmitBlock(ctx context.Context, request *dalc.SubmitBlockRequest) (*dalc.SubmitBlockResponse, error) {
var b types.Block
err := b.FromProto(request.Block)
if err != nil {
return nil, err
func (m *mockImpl) SubmitBlocks(ctx context.Context, request *dalc.SubmitBlocksRequest) (*dalc.SubmitBlocksResponse, error) {
blocks := make([]*types.Block, len(request.Blocks))
for i := range request.Blocks {
var b types.Block
err := b.FromProto(request.Blocks[i])
if err != nil {
return nil, err
}
blocks[i] = &b
}
resp := m.mock.SubmitBlock(ctx, &b)
return &dalc.SubmitBlockResponse{

resp := m.mock.SubmitBlocks(ctx, blocks)
return &dalc.SubmitBlocksResponse{
Result: &dalc.DAResponse{
Code: dalc.StatusCode(resp.Code),
Message: resp.Message,
Expand Down
36 changes: 19 additions & 17 deletions da/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,30 +132,32 @@ func (m *DataAvailabilityLayerClient) GetHeightByHeader(dah *core.DataAvailabili
return 0
}

// SubmitBlock submits the passed in block to the DA layer.
// SubmitBlocks submits the passed in blocks to the DA layer.
// This should create a transaction which (potentially)
// triggers a state transition in the DA layer.
func (m *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
func (m *DataAvailabilityLayerClient) SubmitBlocks(ctx context.Context, blocks []*types.Block) da.ResultSubmitBlocks {
daHeight := atomic.LoadUint64(&m.daHeight)
m.logger.Debug("Submitting block to DA layer!", "height", block.SignedHeader.Header.Height(), "dataLayerHeight", daHeight)

hash := block.SignedHeader.Header.Hash()
blob, err := block.MarshalBinary()
if err != nil {
return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
for _, block := range blocks {
m.logger.Debug("Submitting blocks to DA layer!", "height", block.SignedHeader.Header.Height(), "dataLayerHeight", daHeight)

err = m.dalcKV.Put(ctx, getKey(daHeight, uint64(block.SignedHeader.Header.Height())), hash[:])
if err != nil {
return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
hash := block.SignedHeader.Header.Hash()
blob, err := block.MarshalBinary()
if err != nil {
return da.ResultSubmitBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}

err = m.dalcKV.Put(ctx, ds.NewKey(hex.EncodeToString(hash[:])), blob)
if err != nil {
return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
err = m.dalcKV.Put(ctx, getKey(daHeight, uint64(block.SignedHeader.Header.Height())), hash[:])
if err != nil {
return da.ResultSubmitBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}

return da.ResultSubmitBlock{
err = m.dalcKV.Put(ctx, ds.NewKey(hex.EncodeToString(hash[:])), blob)
if err != nil {
return da.ResultSubmitBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
}
return da.ResultSubmitBlocks{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "OK",
Expand Down
2 changes: 1 addition & 1 deletion da/test/da_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) {

for i := uint64(0); i < 100; i++ {
b := getRandomBlock(i, rand.Int()%20) //nolint:gosec
resp := dalc.SubmitBlock(ctx, b)
resp := dalc.SubmitBlocks(ctx, []*types.Block{b})
assert.Equal(da.StatusSuccess, resp.Code, resp.Message)
time.Sleep(time.Duration(rand.Int63() % mockDaBlockTime.Milliseconds())) //nolint:gosec

Expand Down
8 changes: 4 additions & 4 deletions proto/dalc/dalc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ message DAResponse {
uint64 da_height = 3 [(gogoproto.customname) = "DAHeight"];
}

message SubmitBlockRequest {
rollkit.Block block = 1;
message SubmitBlocksRequest {
repeated rollkit.Block blocks = 1;
}

message SubmitBlockResponse {
message SubmitBlocksResponse {
DAResponse result = 1;
}

Expand All @@ -36,6 +36,6 @@ message RetrieveBlocksResponse {
}

service DALCService {
rpc SubmitBlock(SubmitBlockRequest) returns (SubmitBlockResponse) {}
rpc SubmitBlocks(SubmitBlocksRequest) returns (SubmitBlocksResponse) {}
rpc RetrieveBlocks(RetrieveBlocksRequest) returns (RetrieveBlocksResponse) {}
}
Loading

0 comments on commit a836166

Please sign in to comment.