Skip to content

Commit

Permalink
Goreplay middleware (#4932)
Browse files Browse the repository at this point in the history
* tools/goreplay-middleware: Add goreplay middleware
* Fix linter errors

---------
Co-authored-by: Bartek Nowotarski <bartek@nowotarski.info>
  • Loading branch information
urvisavla authored Jun 27, 2023
1 parent 090b454 commit 7aafb6d
Show file tree
Hide file tree
Showing 9 changed files with 445 additions and 7 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
cloud.google.com/go/storage v1.10.0 // indirect
github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/buger/goreplay v1.3.2
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/structs v1.0.0 // indirect
github.com/gavv/monotime v0.0.0-20161010190848-47d58efa6955 // indirect
Expand All @@ -76,7 +77,7 @@ require (
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/klauspost/compress v1.15.0 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kr/pretty v0.2.0 // indirect
github.com/kr/text v0.1.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
Expand All @@ -91,7 +92,6 @@ require (
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 // indirect
github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521 // indirect
github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/spf13/cast v0.0.0-20150508191742-4d07383ffe94 // indirect
github.com/spf13/jwalterweatherman v0.0.0-20141219030609-3d60171a6431 // indirect
github.com/stretchr/objx v0.3.0 // indirect
Expand Down
51 changes: 46 additions & 5 deletions go.sum

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions support/log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package log

import (
"context"
"io"
"os"

"github.com/segmentio/go-loggly"
Expand Down Expand Up @@ -80,6 +81,10 @@ func SetLevel(level logrus.Level) {
DefaultLogger.SetLevel(level)
}

func SetOut(out io.Writer) {
DefaultLogger.entry.Logger.Out = out
}

func WithField(key string, value interface{}) *Entry {
result := DefaultLogger.WithField(key, value)
return result
Expand Down
Empty file.
1 change: 1 addition & 0 deletions tools/goreplay-middleware/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# goreplay-middleware
157 changes: 157 additions & 0 deletions tools/goreplay-middleware/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// The code below is a goreplay middleware used for regression testing current
// vs next Horizon version. The middleware system of goreplay is rather simple:
// it streams one of 3 message types to stdin: request (HTTP headers),
// original response and replayed response. On request we can modify the request
// and send it to stdout but we don't use this feature here: we send request
// to mirroring target as is. Finally, everything printed to stderr is the
// middleware log, this is where we put the information about the request if the
// diff is found.
//
// More information and diagrams about the middlewares can be found here:
// https://github.com/buger/goreplay/wiki/Middleware
package main

import (
"bufio"
"bytes"
"encoding/hex"
"fmt"
"io"
"os"
"time"

"github.com/buger/goreplay/proto"
"github.com/stellar/go/support/log"
)

// maxPerSecond defines how many requests should be checked at max per second
const maxPerSecond = 100

const (
requestType byte = '1'
originalResponseType byte = '2'
replayedResponseType byte = '3'
)

var lastCheck = time.Now()
var reqsCheckedPerSeq = 0
var pendingRequestsAdded, ignoredCount, diffsCount, okCount int64
var pendingRequests = make(map[string]*Request)

func main() {
processAll(os.Stdin, os.Stderr, os.Stdout)
}

func processAll(stdin io.Reader, stderr, stdout io.Writer) {
log.SetOut(stderr)
log.SetLevel(log.InfoLevel)

bufSize := 20 * 1024 * 1024 // 20MB
scanner := bufio.NewScanner(stdin)
buf := make([]byte, bufSize)
scanner.Buffer(buf, bufSize)
var maxPendingRequests = 2000

for scanner.Scan() {
encoded := scanner.Bytes()
buf := make([]byte, len(encoded)/2)
_, err := hex.Decode(buf, encoded)
if err != nil {
os.Stderr.WriteString(fmt.Sprintf("hex.Decode error: %v", err))
continue
}

if err := scanner.Err(); err != nil {
os.Stderr.WriteString(fmt.Sprintf("scanner.Err(): %v\n", err))
}

process(stderr, stdout, buf)

if len(pendingRequests) > maxPendingRequests {
// Around 3-4% of responses is lost (not sure why) so pendingRequests can grow
// indefinitely. Let's just truncate it when it becomes too big.
// There is one gotcha here. Goreplay will still send requests
// (`1` type payloads) even if traffic is rate limited. So if rate
// limit is applied even more requests can be lost. So we should
// use rate limiting implemented here when using middleware rather than
// Goreplay's rate limit.
pendingRequests = make(map[string]*Request)
}
}
}

func process(stderr, stdout io.Writer, buf []byte) {
// First byte indicate payload type:
payloadType := buf[0]
headerSize := bytes.IndexByte(buf, '\n') + 1
header := buf[:headerSize-1]

// Header contains space separated values of: request type, request id, and request start time (or round-trip time for responses)
meta := bytes.Split(header, []byte(" "))
// For each request you should receive 3 payloads (request, response, replayed response) with same request id
reqID := string(meta[1])
payload := buf[headerSize:]

switch payloadType {
case requestType:
if time.Since(lastCheck) > time.Second {
reqsCheckedPerSeq = 0
lastCheck = time.Now()

// Print stats every second
_, _ = os.Stderr.WriteString(fmt.Sprintf(
"middleware stats: pendingRequests=%d requestsAdded=%d ok=%d diffs=%d ignored=%d\n",
len(pendingRequests),
pendingRequestsAdded,
okCount,
diffsCount,
ignoredCount,
))
}

if reqsCheckedPerSeq < maxPerSecond {
pendingRequests[reqID] = &Request{
Headers: payload,
}
pendingRequestsAdded++
reqsCheckedPerSeq++
}

// Emitting data back, without modification
_, err := io.WriteString(stdout, hex.EncodeToString(buf)+"\n")
if err != nil {
_, _ = io.WriteString(stderr, fmt.Sprintf("stdout.WriteString error: %v", err))
}
case originalResponseType:
if req, ok := pendingRequests[reqID]; ok {
// Original response can arrive after mirrored so this should be improved
// instead of ignoring this case.
req.OriginalResponse = payload
}
case replayedResponseType:
if req, ok := pendingRequests[reqID]; ok {
req.MirroredResponse = payload

if req.IsIgnored() {
ignoredCount++
} else {
if !req.ResponseEquals() {
// TODO in the future publish the results to S3 for easier processing
log.WithFields(log.F{
"expected": req.OriginalBody(),
"actual": req.MirroredBody(),
"headers": string(req.Headers),
"path": string(proto.Path(req.Headers)),
}).Info("Mismatch found")
diffsCount++
} else {
okCount++
}
}

delete(pendingRequests, reqID)
}
default:
_, _ = io.WriteString(stderr, "Unknown message type\n")
}
}
49 changes: 49 additions & 0 deletions tools/goreplay-middleware/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"bytes"
"encoding/hex"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestProcess(t *testing.T) {
// For 1 type, it returns the same msg to stdout
payload := "1 ID\nGET /ledgers HTTP/1.1\r\nHost: horizon.stellar.org\r\n\r\n"
stdin := strings.NewReader(hex.EncodeToString([]byte(payload)))

stdout := bytes.Buffer{}
stderr := bytes.Buffer{}
processAll(stdin, &stderr, &stdout)

decodedOut, err := hex.DecodeString(strings.TrimRight(stdout.String(), "\n"))
assert.NoError(t, err)
assert.Equal(t, payload, string(decodedOut))
assert.Equal(t, "", stderr.String())

// For 2 type, save the original response
payload = "2 ID\nHeader: true\r\n\r\nBody"
stdin = strings.NewReader(hex.EncodeToString([]byte(payload)))

stdout = bytes.Buffer{}
stderr = bytes.Buffer{}
processAll(stdin, &stderr, &stdout)

assert.Len(t, pendingRequests, 1)
assert.Equal(t, "", stdout.String())
assert.Equal(t, "", stderr.String())

// For 2 type, save the original response
payload = "3 ID\nHeader: true\r\n\r\nBody"
stdin = strings.NewReader(hex.EncodeToString([]byte(payload)))

stdout = bytes.Buffer{}
stderr = bytes.Buffer{}
processAll(stdin, &stderr, &stdout)

assert.Len(t, pendingRequests, 0)
assert.Equal(t, "", stdout.String())
assert.Equal(t, "", stderr.String())
}
99 changes: 99 additions & 0 deletions tools/goreplay-middleware/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"bytes"
"regexp"
"strings"

"github.com/buger/goreplay/proto"
)

var horizonURLs = regexp.MustCompile(`https:\/\/.*?(stellar\.org|127.0.0.1:8000)`)
var findResultMetaXDR = regexp.MustCompile(`"result_meta_xdr":[ ]?"([^"]*)",`)

// removeRegexps contains a list of regular expressions that, when matched,
// will be changed to an empty string. This is done to exclude known
// differences in responses between two Horizon version.
//
// Let's say that next Horizon version adds a new bool field:
// `is_authorized` on account balances list. You want to remove this
// field so it's not reported for each `/accounts/{id}` response.
var removeRegexps = []*regexp.Regexp{}

type replace struct {
regexp *regexp.Regexp
repl string
}

// replaceRegexps works like removeRegexps but replaces data
var replaceRegexps = []replace{}

type Request struct {
Headers []byte
OriginalResponse []byte
MirroredResponse []byte
}

func (r *Request) OriginalBody() string {
return string(proto.Body(r.OriginalResponse))
}

func (r *Request) MirroredBody() string {
return string(proto.Body(r.MirroredResponse))
}

func (r *Request) IsIgnored() bool {
if len(r.OriginalResponse) == 0 {
return true
}

originalLatestLedgerHeader := proto.Header(r.OriginalResponse, []byte("Latest-Ledger"))
mirroredLatestLedgerHeader := proto.Header(r.MirroredResponse, []byte("Latest-Ledger"))

if !bytes.Equal(originalLatestLedgerHeader, mirroredLatestLedgerHeader) {
return true
}

// Responses below are not supported but support can be added with some effort
originalTransferEncodingHeader := proto.Header(r.OriginalResponse, []byte("Transfer-Encoding"))
mirroredTransferEncodingHeader := proto.Header(r.MirroredResponse, []byte("Transfer-Encoding"))
if len(originalTransferEncodingHeader) > 0 ||
len(mirroredTransferEncodingHeader) > 0 {
return true
}

acceptEncodingHeader := proto.Header(r.Headers, []byte("Accept-Encoding"))
if strings.Contains(string(acceptEncodingHeader), "gzip") {
return true
}

acceptHeader := proto.Header(r.Headers, []byte("Accept"))
return strings.Contains(string(acceptHeader), "event-stream")
}

func (r *Request) ResponseEquals() bool {
originalBody := proto.Body(r.OriginalResponse)
mirroredBody := proto.Body(r.MirroredResponse)

return normalizeResponseBody(originalBody) == normalizeResponseBody(mirroredBody)
}

// normalizeResponseBody normalizes body to allow byte-byte comparison like removing
// URLs from _links or tx meta. May require updating on new releases.
func normalizeResponseBody(body []byte) string {
normalizedBody := string(body)
// `result_meta_xdr` can differ between core instances (confirmed this with core team)
normalizedBody = findResultMetaXDR.ReplaceAllString(normalizedBody, "")
// Remove Horizon URL from the _links
normalizedBody = horizonURLs.ReplaceAllString(normalizedBody, "")

for _, reg := range removeRegexps {
normalizedBody = reg.ReplaceAllString(normalizedBody, "")
}

for _, reg := range replaceRegexps {
normalizedBody = reg.regexp.ReplaceAllString(normalizedBody, reg.repl)
}

return normalizedBody
}
Loading

0 comments on commit 7aafb6d

Please sign in to comment.