Skip to content

Commit

Permalink
services/horizon: map client disconnects as Status 499 instead of 503…
Browse files Browse the repository at this point in the history
… for metrics reporting (#4098)
  • Loading branch information
sreuland authored Nov 30, 2021
1 parent 27f59f3 commit 4d04063
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 20 deletions.
2 changes: 2 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

### Changes
* Return inner and outer result codes for fee bump transactions ([4081](https://github.com/stellar/go/pull/4081))
* Generate Http Status code of 499 for Client Disconnects, should propagate into `horizon_http_requests_duration_seconds_count`
metric key with status=499 label. ([4098](horizon_http_requests_duration_seconds_count))

## v2.11.0

Expand Down
18 changes: 15 additions & 3 deletions services/horizon/internal/actions/submit_transaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actions

import (
"context"
"encoding/hex"
"mime"
"net/http"
Expand All @@ -16,8 +17,16 @@ import (
"github.com/stellar/go/xdr"
)

type NetworkSubmitter interface {
Submit(
ctx context.Context,
rawTx string,
envelope xdr.TransactionEnvelope,
hash string) <-chan txsub.Result
}

type SubmitTransactionHandler struct {
Submitter *txsub.System
Submitter NetworkSubmitter
NetworkPassphrase string
CoreStateGetter
}
Expand Down Expand Up @@ -78,7 +87,7 @@ func (handler SubmitTransactionHandler) response(r *http.Request, info envelopeI
}

if result.Err == txsub.ErrCanceled {
return nil, &hProblem.Timeout
return nil, &hProblem.ClientDisconnected
}

switch err := result.Err.(type) {
Expand Down Expand Up @@ -153,6 +162,9 @@ func (handler SubmitTransactionHandler) GetResource(w HeaderWriter, r *http.Requ
case result := <-submission:
return handler.response(r, info, result)
case <-r.Context().Done():
return nil, &hProblem.Timeout
if r.Context().Err() == context.Canceled {
return nil, hProblem.ClientDisconnected
}
return nil, hProblem.Timeout
}
}
93 changes: 93 additions & 0 deletions services/horizon/internal/actions/submit_transaction_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package actions

import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/stellar/go/network"
"github.com/stellar/go/services/horizon/internal/corestate"
hProblem "github.com/stellar/go/services/horizon/internal/render/problem"
"github.com/stellar/go/services/horizon/internal/txsub"
"github.com/stellar/go/support/render/problem"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -35,6 +40,19 @@ func (m *coreStateGetterMock) GetCoreState() corestate.State {
return a.Get(0).(corestate.State)
}

type networkSubmitterMock struct {
mock.Mock
}

func (m *networkSubmitterMock) Submit(
ctx context.Context,
rawTx string,
envelope xdr.TransactionEnvelope,
hash string) <-chan txsub.Result {
a := m.Called()
return a.Get(0).(chan txsub.Result)
}

func TestStellarCoreNotSynced(t *testing.T) {
mock := &coreStateGetterMock{}
mock.On("GetCoreState").Return(corestate.State{
Expand Down Expand Up @@ -64,3 +82,78 @@ func TestStellarCoreNotSynced(t *testing.T) {
assert.Equal(t, "stale_history", err.(problem.P).Type)
assert.Equal(t, "Historical DB Is Too Stale", err.(problem.P).Title)
}

func TestTimeoutSubmission(t *testing.T) {
mockSubmitChannel := make(chan txsub.Result)

mock := &coreStateGetterMock{}
mock.On("GetCoreState").Return(corestate.State{
Synced: true,
})

mockSubmitter := &networkSubmitterMock{}
mockSubmitter.On("Submit").Return(mockSubmitChannel)

handler := SubmitTransactionHandler{
Submitter: mockSubmitter,
NetworkPassphrase: network.PublicNetworkPassphrase,
CoreStateGetter: mock,
}

form := url.Values{}
form.Set("tx", "AAAAAAGUcmKO5465JxTSLQOQljwk2SfqAJmZSG6JH6wtqpwhAAABLAAAAAAAAAABAAAAAAAAAAEAAAALaGVsbG8gd29ybGQAAAAAAwAAAAAAAAAAAAAAABbxCy3mLg3hiTqX4VUEEp60pFOrJNxYM1JtxXTwXhY2AAAAAAvrwgAAAAAAAAAAAQAAAAAW8Qst5i4N4Yk6l+FVBBKetKRTqyTcWDNSbcV08F4WNgAAAAAN4Lazj4x61AAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABLaqcIQAAAEBKwqWy3TaOxoGnfm9eUjfTRBvPf34dvDA0Nf+B8z4zBob90UXtuCqmQqwMCyH+okOI3c05br3khkH0yP4kCwcE")

request, err := http.NewRequest(
"POST",
"https://horizon.stellar.org/transactions",
strings.NewReader(form.Encode()),
)

require.NoError(t, err)
request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
ctx, cancel := context.WithTimeout(request.Context(), time.Duration(0))
defer cancel()
request = request.WithContext(ctx)

w := httptest.NewRecorder()
_, err = handler.GetResource(w, request)
assert.Error(t, err)
assert.Equal(t, hProblem.Timeout, err)
}

func TestClientDisconnectSubmission(t *testing.T) {
mockSubmitChannel := make(chan txsub.Result)

mock := &coreStateGetterMock{}
mock.On("GetCoreState").Return(corestate.State{
Synced: true,
})

mockSubmitter := &networkSubmitterMock{}
mockSubmitter.On("Submit").Return(mockSubmitChannel)

handler := SubmitTransactionHandler{
Submitter: mockSubmitter,
NetworkPassphrase: network.PublicNetworkPassphrase,
CoreStateGetter: mock,
}

form := url.Values{}
form.Set("tx", "AAAAAAGUcmKO5465JxTSLQOQljwk2SfqAJmZSG6JH6wtqpwhAAABLAAAAAAAAAABAAAAAAAAAAEAAAALaGVsbG8gd29ybGQAAAAAAwAAAAAAAAAAAAAAABbxCy3mLg3hiTqX4VUEEp60pFOrJNxYM1JtxXTwXhY2AAAAAAvrwgAAAAAAAAAAAQAAAAAW8Qst5i4N4Yk6l+FVBBKetKRTqyTcWDNSbcV08F4WNgAAAAAN4Lazj4x61AAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABLaqcIQAAAEBKwqWy3TaOxoGnfm9eUjfTRBvPf34dvDA0Nf+B8z4zBob90UXtuCqmQqwMCyH+okOI3c05br3khkH0yP4kCwcE")

request, err := http.NewRequest(
"POST",
"https://horizon.stellar.org/transactions",
strings.NewReader(form.Encode()),
)

require.NoError(t, err)
request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
ctx, cancel := context.WithCancel(request.Context())
cancel()
request = request.WithContext(ctx)

w := httptest.NewRecorder()
_, err = handler.GetResource(w, request)
assert.Equal(t, hProblem.ClientDisconnected, err)
}
5 changes: 3 additions & 2 deletions services/horizon/internal/httpx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ func init() {
problem.RegisterError(db2.ErrInvalidOrder, problem.BadRequest)
problem.RegisterError(sse.ErrRateLimited, hProblem.RateLimitExceeded)
problem.RegisterError(context.DeadlineExceeded, hProblem.Timeout)
problem.RegisterError(context.Canceled, hProblem.ServiceUnavailable)
problem.RegisterError(db.ErrCancelled, hProblem.ServiceUnavailable)
problem.RegisterError(context.Canceled, hProblem.ClientDisconnected)
problem.RegisterError(db.ErrCancelled, hProblem.ClientDisconnected)
problem.RegisterError(db.ErrTimeout, hProblem.ServiceUnavailable)
problem.RegisterError(db.ErrConflictWithRecovery, hProblem.ServiceUnavailable)
problem.RegisterError(db.ErrBadConnection, hProblem.ServiceUnavailable)
}
Expand Down
30 changes: 30 additions & 0 deletions services/horizon/internal/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,36 @@ func TestStateMiddleware(t *testing.T) {
}
}

func TestClientDisconnect(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)

request, err := http.NewRequest("GET", "http://localhost/", nil)
tt.Assert.NoError(err)

endpoint := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

stateMiddleware := &httpx.StateMiddleware{
HorizonSession: tt.HorizonSession(),
NoStateVerification: true,
}
handler := chi.NewRouter()
handler.With(stateMiddleware.Wrap).MethodFunc("GET", "/", endpoint)
w := httptest.NewRecorder()

ctx, cancel := context.WithCancel(request.Context())
defer cancel()
request = request.WithContext(ctx)
// cancel invocation simulates client disconnect in the context
cancel()

handler.ServeHTTP(w, request)
tt.Assert.Equal(499, w.Code)
}

func TestCheckHistoryStaleMiddleware(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down
11 changes: 11 additions & 0 deletions services/horizon/internal/render/problem/problem.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ import (

// Well-known and reused problems below:
var (

// ClientDisconnected, represented by a non-standard HTTP status code of 499, which was introduced by
// nginix.org(https://www.nginx.com/resources/wiki/extending/api/http/) as a way to capture this state. Use it as a shortcut
// in your actions.
ClientDisconnected = problem.P{
Type: "client_disconnected",
Title: "Client Disconnected",
Status: 499,
Detail: "The client has closed the connection.",
}

// ServiceUnavailable is a well-known problem type. Use it as a shortcut
// in your actions.
ServiceUnavailable = problem.P{
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/render/problem/problem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestCommonProblems(t *testing.T) {
}{
{"NotFound", problem.NotFound, 404},
{"RateLimitExceeded", RateLimitExceeded, 429},
{"ClientDisconneted", ClientDisconnected, 499},
}

for _, tc := range testCases {
Expand Down
20 changes: 13 additions & 7 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,8 @@ func (sys *System) Submit(
sys.finish(ctx, hash, response, Result{Err: sr.Err})
return
}

if sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())) {
sys.finish(ctx, hash, response, Result{Err: ErrCanceled})
if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil {
sys.finish(ctx, hash, response, Result{Err: err})
return
}

Expand All @@ -194,15 +193,15 @@ func (sys *System) Submit(
}

case <-ctx.Done():
sys.finish(ctx, hash, response, Result{Err: ErrCanceled})
sys.finish(ctx, hash, response, Result{Err: sys.deriveTxSubError(ctx)})
}

return
}

// waitUntilAccountSequence blocks until either the context times out or the sequence number of the
// given source account is greater than or equal to `seq`
func (sys *System) waitUntilAccountSequence(ctx context.Context, db HorizonDB, sourceAddress string, seq uint64) bool {
func (sys *System) waitUntilAccountSequence(ctx context.Context, db HorizonDB, sourceAddress string, seq uint64) error {
timer := time.NewTimer(sys.accountSeqPollInterval)
defer timer.Stop()

Expand All @@ -222,19 +221,26 @@ func (sys *System) waitUntilAccountSequence(ctx context.Context, db HorizonDB, s
Warn("missing sequence number for account")
}
if num >= seq {
return false
return nil
}
}

select {
case <-ctx.Done():
return true
return sys.deriveTxSubError(ctx)
case <-timer.C:
timer.Reset(sys.accountSeqPollInterval)
}
}
}

func (sys *System) deriveTxSubError(ctx context.Context) error {
if ctx.Err() == context.Canceled {
return ErrCanceled
}
return ErrTimeout
}

// Submit submits the provided base64 encoded transaction envelope to the
// network using this submission system.
func (sys *System) submitOnce(ctx context.Context, env string) SubmissionResult {
Expand Down
63 changes: 63 additions & 0 deletions services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,69 @@ func (suite *SystemTestSuite) TestSubmit_Basic() {
assert.False(suite.T(), suite.submitter.WasSubmittedTo)
}

func (suite *SystemTestSuite) TestTimeoutDuringSequnceLoop() {
var cancel context.CancelFunc
suite.ctx, cancel = context.WithTimeout(suite.ctx, time.Duration(0))
defer cancel()

suite.submitter.R = suite.badSeq
suite.db.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Return(sql.ErrNoRows).Once()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil)

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
suite.successXDR,
suite.successTx.Transaction.TransactionHash,
)

assert.NotNil(suite.T(), r.Err)
assert.Equal(suite.T(), ErrTimeout, r.Err)
}

func (suite *SystemTestSuite) TestClientDisconnectedDuringSequnceLoop() {
var cancel context.CancelFunc
suite.ctx, cancel = context.WithCancel(suite.ctx)

suite.submitter.R = suite.badSeq
suite.db.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Return(sql.ErrNoRows).Once()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil).
Run(func(args mock.Arguments) {
// simulate client disconnecting while looping on sequnce number check
cancel()
suite.ctx.Deadline()
}).
Once()
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil)

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
suite.successXDR,
suite.successTx.Transaction.TransactionHash,
)

assert.NotNil(suite.T(), r.Err)
assert.Equal(suite.T(), ErrCanceled, r.Err)
}

func getMetricValue(metric prometheus.Metric) *dto.Metric {
value := &dto.Metric{}
err := metric.Write(value)
Expand Down
3 changes: 3 additions & 0 deletions support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
)

var (
// ErrTimeout is an error returned by Session methods when request has
// taken longer than context's deadline max duration
ErrTimeout = errors.New("canceling statement due to lack of response within timeout period")
// ErrCancelled is an error returned by Session methods when request has
// been cancelled (ex. context cancelled).
ErrCancelled = errors.New("canceling statement due to user request")
Expand Down
Loading

0 comments on commit 4d04063

Please sign in to comment.