Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribe account statuses endpoint #762

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,26 @@ func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig {
}
return subsConf
}

func (c *Client) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startBlockHeight uint64,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter)
}

func (c *Client) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter)
}

func (c *Client) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter)
}
38 changes: 38 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"time"

"github.com/onflow/flow/protobuf/go/flow/executiondata"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -76,6 +77,43 @@ func MessageToAccount(m *entities.Account) (flow.Account, error) {
}, nil
}

func MessageToAccountStatus(m *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) {
if m == nil {
return flow.AccountStatus{}, ErrEmptyMessage
}

results, err := MessageToAccountStatusResults(m.GetResults())
if err != nil {
return flow.AccountStatus{}, fmt.Errorf("error converting results: %w", err)
}

return flow.AccountStatus{
BlockID: MessageToIdentifier(m.GetBlockId()),
BlockHeight: m.GetBlockHeight(),
MessageIndex: m.GetMessageIndex(),
Results: results,
}, nil
}

func MessageToAccountStatusResults(m []*executiondata.SubscribeAccountStatusesResponse_Result) ([]*flow.AccountStatusResult, error) {
results := make([]*flow.AccountStatusResult, len(m))
var emptyOptions []jsoncdc.Option

for i, r := range m {
events, err := MessagesToEvents(r.GetEvents(), emptyOptions)
if err != nil {
return nil, fmt.Errorf("error converting events: %w", err)
}

results[i] = &flow.AccountStatusResult{
Address: MessageToIdentifier(r.GetAddress()),
Events: events,
}
}

return results, nil
}

func AccountKeyToMessage(a *flow.AccountKey) *entities.AccountKey {
return &entities.AccountKey{
Index: uint32(a.Index),
Expand Down
158 changes: 158 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/onflow/flow/protobuf/go/flow/entities"
"google.golang.org/grpc"
Expand Down Expand Up @@ -84,6 +85,7 @@ type BaseClient struct {
close func() error
jsonOptions []json.Option
eventEncoding flow.EventEncodingVersion
messageIndex atomic.Uint64
}

// NewBaseClient creates a new gRPC handler for network communication.
Expand Down Expand Up @@ -1296,3 +1298,159 @@ func receiveBlocksFromClient[Client interface {
}
}
}

func (c *BaseClient) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartHeightRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromLatestBlockRequest{
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromLatestBlock(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex)
}()

return accountStatutesChan, errChan, nil
}

func receiveAccountStatusesFromClient[Client interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using Client as the name is a little confusing, especially within a client library. how about breaking this out to a stand along type since it's used in a bunch of places, and renaming it to Stream?

Recv() (*executiondata.SubscribeAccountStatusesResponse, error)
}](
ctx context.Context,
client Client,
accountStatutesChan chan<- flow.AccountStatus,
errChan chan<- error,
previousMsgIndex *atomic.Uint64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message index is specific to a stream. So when you start a new stream, the responses start with message index 0 (or 1, I don't remember how it started), then increment.

I think it's fine to track it only within this function as a regular uint64 value. There should not be any concurrent access.

) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
accountStatusResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving account status: %w", err))
return
}

accountStatus, err := convert.MessageToAccountStatus(accountStatusResponse)
if err != nil {
sendErr(fmt.Errorf("error converting message to account status: %w", err))
return
}

illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
if err = checkAndIncrementMessageIndex(previousMsgIndex, accountStatus.MessageIndex); err != nil {
sendErr(fmt.Errorf("error checking message index. messages are not ordered: %w", err))
return
}

select {
case <-ctx.Done():
return
case accountStatutesChan <- accountStatus:
}
}
}

func checkAndIncrementMessageIndex(previousMessageIndex *atomic.Uint64, currentMessageIndex uint64) error {
Copy link
Contributor Author

@illia-malachyn illia-malachyn Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterargue is this implementation good enough? It might be good to add more tests for different msg indexes. However, I'm not sure what cases to cover. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can simply it a bit by tracking "next" instead of the current.
e.g.

next := 0 // since we start at 0, the first expected should be 0
for {
	if msg.MessageIndex != next {
		return err
	}
	next = msg.MessageIndex + 1
}

local := previousMessageIndex.Load()

if local == 0 && currentMessageIndex == 0 {
return nil
}

if currentMessageIndex != local+1 {
return errors.New("current message index is not exactly one larger than the previous one")
}

previousMessageIndex.Add(1)
return nil
}
Loading
Loading