Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Don't duplicate log entries for replay
Browse files Browse the repository at this point in the history
Previously we weren't deleting our item from the unbounds when we bound
it in response to detail statements. This leads to a duplication of
items that this commit tests for and fixes.
  • Loading branch information
lawrencejones committed Mar 5, 2019
1 parent 4d621d6 commit 7ef5565
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 9 deletions.
7 changes: 4 additions & 3 deletions pkg/pgreplay/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) (
// for this session, as this log line will confirm the unbound query has no parameters.
if strings.HasPrefix(msg, LogDuration) {
if unbound, ok := unbounds[details.SessionID]; ok {
delete(unbounds, details.SessionID)
return unbound.Bind(nil), nil
}

Expand All @@ -139,7 +140,7 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) (

// LOG: statement: select pg_reload_conf();
if strings.HasPrefix(msg, LogStatement) {
return &Statement{details, strings.TrimPrefix(msg, LogStatement)}, nil
return Statement{details, strings.TrimPrefix(msg, LogStatement)}, nil
}

// LOG: execute <unnamed>: select pg_sleep($1)
Expand Down Expand Up @@ -200,12 +201,12 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) (

// LOG: connection authorized: user=postgres database=postgres
if strings.HasPrefix(msg, LogConnectionAuthorized) {
return &Connect{details}, nil
return Connect{details}, nil
}

// LOG: disconnection: session time: 0:00:03.861 user=postgres database=postgres host=192.168.99.1 port=51529
if strings.HasPrefix(msg, LogConnectionDisconnect) {
return &Disconnect{details}, nil
return Disconnect{details}, nil
}

// LOG: connection received: host=192.168.99.1 port=52188
Expand Down
85 changes: 84 additions & 1 deletion pkg/pgreplay/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,94 @@ package pgreplay

import (
"strings"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
_ "github.com/onsi/gomega/gstruct"
)

var time20190225, _ = time.Parse(time.RFC3339, "2019-02-25T15:08:27.222+00:00")

var _ = Describe("Parse", func() {
DescribeTable("Parses",
func(input string, expected []Item) {
var items = []Item{}
itemsChan, errs, done := Parse(strings.NewReader(input))
go func() {
for _ = range errs {
// no-op, just drain the channel
}
}()

for item := range itemsChan {
if item != nil {
items = append(items, item)
}
}

Eventually(done).Should(BeClosed())
Expect(len(items)).To(Equal(len(expected)))

for idx, item := range items {
Expect(item).To(BeEquivalentTo(expected[idx]))
}
},
Entry(
"Extended protocol with duration logs",
`
2019-02-25 15:08:27.232 GMT|[unknown]|[unknown]|5c7404eb.d6bd|LOG: connection received: host=127.0.0.1 port=59103
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: connection authorized: user=alice database=pgreplay_test
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.968 ms parse <unnamed>: select t.oid
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 1.100 ms bind <unnamed>: select t.oid
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: execute <unnamed>: select t.oid
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.326 ms
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.042 ms parse <unnamed>: insert into logs (author, message) ($1, $2)
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.045 ms bind <unnamed>: insert into logs (author, message) ($1, $2)
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|DETAIL: parameters: $1 = 'alice', $2 = 'bob'
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: execute <unnamed>: insert into logs (author, message) ($1, $2)
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|DETAIL: parameters: $1 = 'alice', $2 = 'bob'
2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.042 ms`,
[]Item{
Connect{
Details{
Timestamp: time20190225,
SessionID: "5c7404eb.d6bd",
User: "alice",
Database: "pgreplay_test",
},
},
BoundExecute{
Execute: &Execute{
Details: Details{
Timestamp: time20190225,
SessionID: "5c7404eb.d6bd",
User: "alice",
Database: "pgreplay_test",
},
Query: "select t.oid",
},
Parameters: []interface{}{},
},
BoundExecute{
Execute: &Execute{
Details: Details{
Timestamp: time20190225,
SessionID: "5c7404eb.d6bd",
User: "alice",
Database: "pgreplay_test",
},
Query: "insert into logs (author, message) ($1, $2)",
},
Parameters: []interface{}{"alice", "bob"},
},
},
),
)
})

var _ = Describe("ParseBindParameters", func() {
DescribeTable("Parses",
func(input string, expected []interface{}) {
Expand Down Expand Up @@ -44,7 +126,8 @@ var _ = Describe("LogScanner", func() {
),
Entry(
"Multiple lines",
`2010-12-31 10:59:52.243 UTC|postgres
`
2010-12-31 10:59:52.243 UTC|postgres
2010-12-31 10:59:53.000 UTC|paysvc`,
[]string{
`2010-12-31 10:59:52.243 UTC|postgres`,
Expand Down
10 changes: 5 additions & 5 deletions pkg/pgreplay/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ type Item interface {
}

type Details struct {
Timestamp time.Time
SessionID SessionID
User string
Database string
Timestamp time.Time `json:"timestamp"`
SessionID SessionID `json:"session_id"`
User string `json:"user"`
Database string `json:"database"`
}

func (e Details) GetTimestamp() time.Time { return e.Timestamp }
Expand All @@ -51,7 +51,7 @@ func (_ Disconnect) Handle(conn *pgx.Conn) error {

type Statement struct {
Details
Query string
Query string `json:"query"`
}

func (s Statement) Handle(conn *pgx.Conn) error {
Expand Down

0 comments on commit 7ef5565

Please sign in to comment.