diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go index 2ccbc3c7f2a..4a661874d0e 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go @@ -3,19 +3,19 @@ package data_providers import ( "context" "fmt" - "github.com/onflow/flow-go/engine/access/rest/common/parser" - "github.com/onflow/flow-go/engine/access/subscription" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/counters" - "github.com/onflow/flow/protobuf/go/flow/entities" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "strconv" "github.com/rs/zerolog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/counters" + + "github.com/onflow/flow/protobuf/go/flow/entities" ) // sendTransactionStatusesArguments contains the arguments required for sending tx and subscribing to transaction statuses @@ -83,18 +83,18 @@ func (p *SendTransactionStatusesDataProvider) createSubscription( // No errors are expected during normal operations. func (p *SendTransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { - messageIndex := counters.NewMonotonousCounter(1) + messageIndex := counters.NewMonotonousCounter(0) return func(txResults []*access.TransactionResult) error { - index := messageIndex.Value() if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) } + index := messageIndex.Value() p.send <- &models.TransactionStatusesResponse{ TransactionResults: txResults, - MessageIndex: strconv.FormatUint(index, 10), + MessageIndex: index, } return nil diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index c73a193da4b..7e5e993dd23 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -3,7 +3,6 @@ package data_providers import ( "context" "fmt" - "strconv" "github.com/rs/zerolog" "google.golang.org/grpc/codes" @@ -94,7 +93,7 @@ func (p *TransactionStatusesDataProvider) createSubscription( // // No errors are expected during normal operations. func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { - messageIndex := counters.NewMonotonousCounter(1) + messageIndex := counters.NewMonotonousCounter(0) return func(txResults []*access.TransactionResult) error { @@ -105,7 +104,7 @@ func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*acc p.send <- &models.TransactionStatusesResponse{ TransactionResults: txResults, - MessageIndex: strconv.FormatUint(index, 10), + MessageIndex: index, } return nil diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index 723e866ca74..11b36d14cb1 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -228,3 +228,82 @@ func invalidTransactionStatusesArgumentsTestCases() []testErrType { }, } } + +// TestMessageIndexTransactionStatusesProviderResponse_HappyPath tests that MessageIndex values in response are strictly increasing. +func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesProviderResponse_HappyPath() { + ctx := context.Background() + send := make(chan interface{}, 10) + topic := TransactionStatusesTopic + txStatusesCount := 4 + + // Create a channel to simulate the subscription's account statuses channel + txStatusesChan := make(chan interface{}) + + // Create a mock subscription and mock the channel + sub := ssmock.NewSubscription(s.T()) + sub.On("Channel").Return((<-chan interface{})(txStatusesChan)) + sub.On("Err").Return(nil) + + s.api.On( + "SubscribeTransactionStatusesFromStartBlockID", + mock.Anything, + mock.Anything, + mock.Anything, + entities.EventEncodingVersion_JSON_CDC_V0, + ).Return(sub) + + arguments := + map[string]interface{}{ + "start_block_id": s.rootBlock.ID().String(), + } + + // Create the TransactionStatusesDataProvider instance + provider, err := NewTransactionStatusesDataProvider( + ctx, + s.log, + s.api, + topic, + arguments, + send, + ) + s.Require().NotNil(provider) + s.Require().NoError(err) + + // Run the provider in a separate goroutine to simulate subscription processing + go func() { + err = provider.Run() + s.Require().NoError(err) + }() + + // Simulate emitting data to the еч statuses channel + go func() { + defer close(txStatusesChan) // Close the channel when done + + for i := 0; i < txStatusesCount; i++ { + txStatusesChan <- []*access.TransactionResult{} + } + }() + + // Collect responses + var responses []*models.TransactionStatusesResponse + for i := 0; i < txStatusesCount; i++ { + res := <-send + txStatusesRes, ok := res.(*models.TransactionStatusesResponse) + s.Require().True(ok, "Expected *models.TransactionStatusesResponse, got %T", res) + responses = append(responses, txStatusesRes) + } + + // Verifying that indices are starting from 0 + s.Require().Equal(uint64(0), responses[0].MessageIndex, "Expected MessageIndex to start with 0") + + // Verifying that indices are strictly increasing + for i := 1; i < len(responses); i++ { + prevIndex := responses[i-1].MessageIndex + currentIndex := responses[i].MessageIndex + s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1") + } + + // Ensure the provider is properly closed after the test + provider.Close() + +} diff --git a/engine/access/rest/websockets/models/tx_statuses_model.go b/engine/access/rest/websockets/models/tx_statuses_model.go index 55b9d3bdfdd..32754a06603 100644 --- a/engine/access/rest/websockets/models/tx_statuses_model.go +++ b/engine/access/rest/websockets/models/tx_statuses_model.go @@ -7,5 +7,5 @@ import ( // TransactionStatusesResponse is the response message for 'events' topic. type TransactionStatusesResponse struct { TransactionResults []*access.TransactionResult `json:"transaction_results"` - MessageIndex string `json:"message_index"` + MessageIndex uint64 `json:"message_index"` }