Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribe account statuses endpoint #762

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,26 @@ func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig {
}
return subsConf
}

func (c *Client) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startBlockHeight uint64,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter)
}

func (c *Client) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter)
}

func (c *Client) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter)
}
38 changes: 38 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"time"

"github.com/onflow/flow/protobuf/go/flow/executiondata"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -76,6 +77,43 @@ func MessageToAccount(m *entities.Account) (flow.Account, error) {
}, nil
}

func MessageToAccountStatus(m *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) {
if m == nil {
return flow.AccountStatus{}, ErrEmptyMessage
}

results, err := MessageToAccountStatusResults(m.GetResults())
if err != nil {
return flow.AccountStatus{}, fmt.Errorf("error converting results: %w", err)
}

return flow.AccountStatus{
BlockID: MessageToIdentifier(m.GetBlockId()),
BlockHeight: m.GetBlockHeight(),
MessageIndex: m.GetMessageIndex(),
Results: results,
}, nil
}

func MessageToAccountStatusResults(m []*executiondata.SubscribeAccountStatusesResponse_Result) ([]*flow.AccountStatusResult, error) {
results := make([]*flow.AccountStatusResult, len(m))
var emptyOptions []jsoncdc.Option

for i, r := range m {
events, err := MessagesToEvents(r.GetEvents(), emptyOptions)
if err != nil {
return nil, fmt.Errorf("error converting events: %w", err)
}

results[i] = &flow.AccountStatusResult{
Address: MessageToIdentifier(r.GetAddress()),
Events: events,
}
}

return results, nil
}

func AccountKeyToMessage(a *flow.AccountKey) *entities.AccountKey {
return &entities.AccountKey{
Index: uint32(a.Index),
Expand Down
135 changes: 135 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,3 +1129,138 @@ func (c *BaseClient) subscribeEvents(

return sub, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartHeightRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromLatestBlockRequest{
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromLatestBlock(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func receiveAccountStatusesFromClient[Client interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using Client as the name is a little confusing, especially within a client library. how about breaking this out to a stand along type since it's used in a bunch of places, and renaming it to Stream?

Recv() (*executiondata.SubscribeAccountStatusesResponse, error)
}](
ctx context.Context,
client Client,
accountStatutesChan chan<- flow.AccountStatus,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
accountStatusResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving account status: %w", err))
return
}

accountStatus, err := convert.MessageToAccountStatus(accountStatusResponse)
if err != nil {
sendErr(fmt.Errorf("error converting message to account status: %w", err))
return
}

illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-ctx.Done():
return
case accountStatutesChan <- accountStatus:
}
}
}
Loading
Loading