diff --git a/access/handler.go b/access/handler.go index 3007fc6f691..d4205d3d481 100644 --- a/access/handler.go +++ b/access/handler.go @@ -1435,15 +1435,19 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( messageIndex := counters.NewMonotonousCounter(0) return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error { for i := range txResults { - value := messageIndex.Increment() + index := messageIndex.Value() err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{ TransactionResults: TransactionResultToMessage(txResults[i]), - MessageIndex: value, + MessageIndex: index, }) if err != nil { return rpc.ConvertError(err, "could not send response", codes.Internal) } + + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } } return nil diff --git a/engine/access/state_stream/backend/handler.go b/engine/access/state_stream/backend/handler.go index 11281da9486..8ed2ca8051d 100644 --- a/engine/access/state_stream/backend/handler.go +++ b/engine/access/state_stream/backend/handler.go @@ -381,7 +381,7 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea return status.Errorf(codes.Internal, "could not convert events to entity: %v", err) } - index := messageIndex.Increment() + index := messageIndex.Value() err = send(&executiondata.SubscribeEventsResponse{ BlockHeight: resp.Height, @@ -394,6 +394,10 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea return rpc.ConvertError(err, "could not send response", codes.Internal) } + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + return nil } } @@ -495,7 +499,7 @@ func (h *Handler) handleAccountStatusesResponse( return err } - index := messageIndex.Increment() + index := messageIndex.Value() err = send(&executiondata.SubscribeAccountStatusesResponse{ BlockId: convert.IdentifierToMessage(resp.BlockID), @@ -507,6 +511,10 @@ func (h *Handler) handleAccountStatusesResponse( return rpc.ConvertError(err, "could not send response", codes.Internal) } + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + return nil } } diff --git a/integration/tests/access/cohort1/access_api_test.go b/integration/tests/access/cohort1/access_api_test.go index f070c1a4140..6470ced35b2 100644 --- a/integration/tests/access/cohort1/access_api_test.go +++ b/integration/tests/access/cohort1/access_api_test.go @@ -319,7 +319,7 @@ func (s *AccessAPISuite) TestSendAndSubscribeTransactionStatuses() { }) s.Require().NoError(err) - expectedCounter := uint64(1) + expectedCounter := uint64(0) lastReportedTxStatus := entities.TransactionStatus_UNKNOWN var txID sdk.Identifier